An alternative technic to polling the
database for updates is to use what's known as a Continuous Query strategy. A Continuous Query allows you to submit a
query asynchronously and then get a stream of notifications later when the query matches.
In this blog I am going to describe an
example of how to achieve Continuous Queries with MongoDB and RabbitMQ.
Step 1: Identifying Change
As described in my blog “MongoDB + RabbitMQ: Distributed Operations”,
it is straightforward to stream the MongoDB operational log over RabbitMQ. This
technic can be used to identifying when data has changed within a MongoDB Collection.
In this example, I stream the MongoDB "oplog" records onto a RabbitMQ Exchange called “log.publisher.ex” and set a routing
key based upon the namespace within the MongoDB operational record. The
namespace is actually made up from the database name and the collection name.
Step 2: Client Continuous Query Request
In this example of continuous query, the
client submits a continuous query request to a RabbitMQ Exchange (“cq.client.register.ex”)
and then listens for the response to appear at a later date on a callback queue.
The client’s continuous query request
includes the MongoDB query to be executed and the name of the database and
collection the query will be executed against. Each continuous query also has a
unique name that is used to route the response back to the client.
The
Spring Integration application listens for continuous query requests by having
a Queue (“cq.register.queue”) bound to the Exchange “cq.client.register.ex”.
Step 3: Storing and Binding the Request
The goal of this example was to remove
unnecessary database polling so we need to be a little more intelligent on when
we to execute the client’s query.
In Step 1 we are receiving notification
every time a MongoDB Collection is changed. This information can be used to
decided when we would execute the client’s query i.e. there is no point
executing the client’s query when the target Collection has not changed. Step1
also associated a routing key on the “log.publisher.ex”
Exchange based upon the namespace of the MongoDB oplog record it received.
When the client submits a continuous query
request it includes the target namespace within the request. Including the
namespace allows the Spring Integration application to dynamically create a
binding key on continuous query executor Queue (“cq.executor.queue”) that is connected to the “log.publisher.ex”
Exchange. This means that RabbitMQ will only route oplog records that have
associated continuous queries to the to the continuous query executor for
processing.
In addition to dynamically routing oplog
records, the Spring Integration application also records the request details in
a defined MongoDB database (“cqdb”) and collection (“cqcoll’).
Step 4: Continuous Query Execution
Due to the dynamic binding key on the
continuous query executor queue (“cq.executor.queue”), RabbitMQ will only route oplog records to cq.executor.queue
that have associated continuous query against the namespace.
When an oplog record is placed on the
Queue, the Spring Integration application picks it up and queries MongoDB for the
corresponding continuous query request record.
The continuous query request contains the client’s query, which is then
executed against MongoDB. The results of the client query are then published to
a RabbitMQ Exchange (“cq.client.callback.ex") with a routing key that is
the continuous query name.
Note that this processing is only triggered
when the MongoDB oplog reports a change to a targeted MongoDB collection.
Step 5: Receiving a Response
At this point the continuous query result
has been placed on a RabbitMQ Exchange called “cq.client.callback.ex" with
a routing key that is the name of the continuous query.
In order to get the response back to the
client, the client application creates a Queue on the “cq.client.callback.ex"
Exchange with the binding key that is the name of the continuous query it is
interested in getting responses from. RabbitMQ then takes on the responsibility
of routing the correct response back to the appropriate client Queue.
Step 6: Remove a Continuous Query
Once the client receives a response from a
continuous query it may want to remove the continuous query. In this example,
the removal of a continuous query is achieved by the client application publishing
a continuous query removal request to a RabbitMQ Exchange (“cq.client.remove.ex”).
The
Spring Integration application has a defined Queue (“cq.remove.queue”)
that is connected to the exchange “cq.client.remove.ex”. When a message arrives the Spring
Integration application reads the request and removes the corresponding continuous
query from MongoDB.
Example Source: https://github.com/cjharris5/mongo-rabbitmq-cq
Enjoy J






No comments:
Post a Comment