Mulesoft polling is one of the most important features in Mulesoft. If you are dealing with large set of data or asynchronous workflow, polling architect is one of the best options to manage this scenario.
There are two options to configure Mulesoft poll scheduling.
- Fixed frequency scheduler – you can define polling time in frequency, start delay and time unit. This is one of the simplest ways to define your polling.
- Cron scheduler – Cron scheduler gives ability to use expression language and manage complex scheduling polling.
There is no relationship between two polling. This was challenge for me to get the relationship between two polling so that I can manage my data more efficiently.
Mulesoft gives couple of options to set up relationship between two polls.
Watermark – In polling there is always challenge to process newly created data and keep persist pointer for processed data to avoid duplicate processing. Mulesoft allows us as Watermark to persist this pointer in objectstore. Mule sets a watermark to a default value the first time the flow runs, then uses it as necessary when running a query or making an outbound request. Based on flow Mule may update the original value of the watermark or maintain the original value.
Here is simple flow to show how to implement watermark for poll
Here is code for this flow
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:schedulers="http://www.mulesoft.org/schema/mule/schedulers" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/schedulers http://www.mulesoft.org/schema/mule/schedulers/current/mule-schedulers.xsd">
<spring:beans>
<spring:bean id="dataSourceBean" name="dataSource_Bean" class="org.apache.commons.dbcp.BasicDataSource">
<spring:property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<spring:property name="username" value="********"/>
<spring:property name="password" value="********* "/>
<spring:property name="url" value="jdbc:jtds:sqlserver://localhost:60520;Instance=CRM;DatabaseName=vanrish;domain=man;integrated security=false"/>
</spring:bean>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSourceBean" doc:name="Generic Database Configuration" >
<reconnect-forever frequency="30000"/>
</db:generic-config>
<flow name="poll-watermarking" processingStrategy="synchronous">
<poll doc:name="Poll">
<schedulers:cron-scheduler expression="0 22 12 * * ?"/>
<watermark variable="serialNumber" default-expression="0" selector="LAST" selector-expression="#[payload.serialNumber]"/>
<db:select config-ref="Generic_Database_Configuration" doc:name="Select Database">
<db:dynamic-query><![CDATA[SELECT
MessageId, MessageType,SerialNumber,CreatedOn FROM Message where SerialNumber > #[Integer.parseInt(flowVars['serialNumber'])] order by SerialNumber asc]]></db:dynamic-query>
</db:select>
</poll>
<logger message="#[flowVars['serialNumber']] == Hello this is Loggin Message == #[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>
Idempotent Filter – Idempotent filter is another way in Mule we can keep track between two polling. This filter ensures that only unique messages are received by a service by checking the unique ID of the incoming message. This filter also store unique id/pointer in objectstore in mule.
Here is simple flow to use Idempotent Filter
Poll Idempotent Filter flow Diagram
Here is code
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:schedulers="http://www.mulesoft.org/schema/mule/schedulers" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:db="http://www.mulesoft.org/schema/mule/db"
xmlns:cassandradb="http://www.mulesoft.org/schema/mule/cassandradb" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/schedulers http://www.mulesoft.org/schema/mule/schedulers/current/mule-schedulers.xsd">
<spring:beans>
<spring:bean id="dataSourceBean" name="dataSource_Bean" class="org.apache.commons.dbcp.BasicDataSource">
<spring:property name="driverClassName" value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<spring:property name="username" value="*******"/>
<spring:property name="password" value="********"/>
<spring:property name="url" value="jdbc:jtds:sqlserver://localhost:60520;Instance=CRM;DatabaseName=vanrish;domain=man;integrated security=false"/>
</spring:bean>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSourceBean" doc:name="Generic Database Configuration" >
<reconnect-forever frequency="30000"/>
</db:generic-config>
<flow name="poll-idempotent" processingStrategy="synchronous">
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="10000"/>
<db:select config-ref="Generic_Database_Configuration" doc:name="Select Database">
<db:dynamic-query><![CDATA[SELECT MessageId, MessageType,SerialNumber,CreatedOn FROM Message order by SerialNumber asc]]></db:dynamic-query>
</db:select>
</poll>
<foreach doc:name="For Each">
<idempotent-message-filter idExpression="#[payload.SerialNumber]" doc:name="Idempotent Message">
<in-memory-store entryTTL="120000" expirationInterval="1800000"/>
</idempotent-message-filter>
<logger message="#[payload.SerialNumber] == Hello this is Loggin Message == #[payload]" level="INFO" doc:name="Logger"/>
</foreach>
</flow>
</mule>
Rajnish Kumar is CTO of Vanrish Technology with Over 25 years experience in different industries and technology. He is very passionate about innovation and latest technology like APIs, IOT (Internet Of Things), Artificial Intelligence (AI) ecosystem and Cybersecurity. He present his idea in different platforms and help customer to their digital transformation journey.