An alternative technic to polling the database for updates is to use what's known as a Continuous Query strategy. A Continuous Query allows you to submit a query asynchronously and then get a stream of notifications later when the query matches.
In this blog I am going to describe an example of how to achieve Continuous Queries with MongoDB and RabbitMQ.
Step 1: Identifying Change
As described in my blog “MongoDB + RabbitMQ: Distributed Operations”, it is straightforward to stream the MongoDB operational log over RabbitMQ. This technic can be used to identifying when data has changed within a MongoDB Collection.
In this example, I stream the MongoDB "oplog" records onto a RabbitMQ Exchange called “log.publisher.ex” and set a routing key based upon the namespace within the MongoDB operational record. The namespace is actually made up from the database name and the collection name.
Step 2: Client Continuous Query Request
In this example of continuous query, the client submits a continuous query request to a RabbitMQ Exchange (“cq.client.register.ex”) and then listens for the response to appear at a later date on a callback queue.
The client’s continuous query request includes the MongoDB query to be executed and the name of the database and collection the query will be executed against. Each continuous query also has a unique name that is used to route the response back to the client.
The Spring Integration application listens for continuous query requests by having a Queue (“cq.register.queue”) bound to the Exchange “cq.client.register.ex”.
Step 3: Storing and Binding the Request
The goal of this example was to remove unnecessary database polling so we need to be a little more intelligent on when we to execute the client’s query.
In Step 1 we are receiving notification every time a MongoDB Collection is changed. This information can be used to decided when we would execute the client’s query i.e. there is no point executing the client’s query when the target Collection has not changed. Step1 also associated a routing key on the “log.publisher.ex” Exchange based upon the namespace of the MongoDB oplog record it received.
When the client submits a continuous query request it includes the target namespace within the request. Including the namespace allows the Spring Integration application to dynamically create a binding key on continuous query executor Queue (“cq.executor.queue”) that is connected to the “log.publisher.ex” Exchange. This means that RabbitMQ will only route oplog records that have associated continuous queries to the to the continuous query executor for processing.
In addition to dynamically routing oplog records, the Spring Integration application also records the request details in a defined MongoDB database (“cqdb”) and collection (“cqcoll’).
Step 4: Continuous Query Execution
Due to the dynamic binding key on the continuous query executor queue (“cq.executor.queue”), RabbitMQ will only route oplog records to cq.executor.queue that have associated continuous query against the namespace.
When an oplog record is placed on the Queue, the Spring Integration application picks it up and queries MongoDB for the corresponding continuous query request record. The continuous query request contains the client’s query, which is then executed against MongoDB. The results of the client query are then published to a RabbitMQ Exchange (“cq.client.callback.ex") with a routing key that is the continuous query name.
Note that this processing is only triggered when the MongoDB oplog reports a change to a targeted MongoDB collection.
Step 5: Receiving a Response
At this point the continuous query result has been placed on a RabbitMQ Exchange called “cq.client.callback.ex" with a routing key that is the name of the continuous query.
In order to get the response back to the client, the client application creates a Queue on the “cq.client.callback.ex" Exchange with the binding key that is the name of the continuous query it is interested in getting responses from. RabbitMQ then takes on the responsibility of routing the correct response back to the appropriate client Queue.
Step 6: Remove a Continuous Query
Once the client receives a response from a continuous query it may want to remove the continuous query. In this example, the removal of a continuous query is achieved by the client application publishing a continuous query removal request to a RabbitMQ Exchange (“cq.client.remove.ex”).
The Spring Integration application has a defined Queue (“cq.remove.queue”) that is connected to the exchange “cq.client.remove.ex”. When a message arrives the Spring Integration application reads the request and removes the corresponding continuous query from MongoDB.
Example Source: https://github.com/cjharris5/mongo-rabbitmq-cq