Change Data Capturing for MongoDB With WSO2 Streaming Integrator

Prabod Dunuwila
5 min readFeb 17, 2020

Data can change so fast creating massive waves which would create new opportunities or threats. What if the data source could emit notifications regarding a data change instead of just keeping the data stored. This is where the Change Data Capture comes into play. Currently WSO2 Streaming Integrator has the support for,

  • MySQL
  • MS SQL
  • PostgreSQL
  • Oracle

databases for change data capturing on listening mode. With the new improvements, it now supports the change data capturing for MongoDB.

How it works

For the implementation of CDC for MongoDB for listening mode, we have used the Debezium connector. It helps to detect all of the inserts, updates, and deletes that other applications commit to your databases. We use the native CDC implementation in MongoDB which will communicate with Debezium CDC connector, when the changes happen in the database. Then Debezium connector delivers the changes to the WSO2 Streaming Integrator. And Siddhi will emit an event with the payload which could be received as a Siddhi event.

First lets try to understand this using a simple scenario.

Simple Use Case

Consider we have a collection in the MongoDB system which records the details of stocks available. Using this we are going to identify changes to database when,

  • new stocks are inserted (insert)
  • update amount of stocks (update)
  • removal of older stocks (delete)

Setup the environment

  • Deploy MongoDB replica sets : To use CDC for MongoDB we need to configure a replica set to configure native CDC implementation. To set up a MongoDB replica set please refer to this tutorial on “Deploy a replica set”. Create a replica set named ‘mongo01’ with host 127.0.0.1 and port 27017. Please find the configurations file “/etc/mongod.conf” for MongoDB.
/etc/mongod.conf
  • Create a database in MongoDB : Start the MongoDB server and create a database named ‘StocksDB’ and a collection named is ‘stocks’.
  • Configure Streaming Integrator : You can download Streaming Integrator from here and find the documentation about starting the SI server and deploying a Siddhi application from here.
  • Write the Siddhi application : Start the Streaming Integrator server and deploy the following sample Siddhi application ‘CDCMongoDBApp.siddhi’ .

Sample Siddhi application

Please find the full Siddhi application used for this tutorial from here.

CDCMongoDBApp.siddhi

In the Siddhi application, we can use multiple CDC sources to trigger changes in the database. When we start the Siddhi application, it will listen to any changes in the database and inform users when there is any change happen to the data source. The following figure shows the design view of the application which could be used to listen to inserts, updates and deletes in the data source.

We have used 3 streams named InsertStream, UpdateStream and DeleteStream to capture changes in the database. These streams act as the CDC source for insert, update and delete events. When the events arrive at the source then the event attributes are logged using sinks.

Please note that supported attribute types are,

  • boolean
  • double
  • string
  • NumberInt (mapped as ‘int’ in Siddhi)
  • NumberLong (mapped as ‘long’ in Siddhi)
  • NumberDecimal (mapped as ‘double’ in Siddhi)
  • JSON objects (mapped as ‘string’ in Siddhi)

in MongoDB. And the default generated MongoDB “_id” field is captured at CDC source by the attribute name of “id” with the type “string”.

First lets take a look at the source defined to capture insert events.

Capture Insert Operations

@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'your_username', password = 'your_password', table.name = 'stocks', operation = 'insert',                               @map(type = 'keyvalue', @attributes(id = 'id', name = 'name', volume = 'volume', stockDetails = 'stockDetails') ))                       define stream InsertStream (id string, name string, volume int, stockDetails string);

The ‘url’ format for MongoDB must be “ jdbc:mongodb: // <replica_set_name>/<host>:<port>/<database_name>”. When we insert a new document to the database, CDC source will capture values defined for attributes id, price, volume and stockDetails. The “id” is the default id generated by MongoDB. The relevant query to insert operation for this scenario would be,

db.stocks.insert({name:"stock001",volume:NumberInt(100), stockDetails:{ "supplier":"John"}});

If you want to explicitly define the “_id” field which is generated by MongoDB, then you can use the following insert operation.

db.stocks.insert({_id:ObjectId("5e3be50798c67653c0d75cf0"), name:"stock001",volume:NumberInt(100), stockDetails:{ "supplier":"John"}});

We can log the event details and get a log message as follows.

INFO {io.siddhi.core.stream.output.sink.LogSink} — CDCApp : LogInsertStream : Event{timestamp=1580983587981, data=[5e3be50798c67653c0d75cf0, stock001, 100, {“supplier”:”John”}], isExpired=false}

Capture Update Operations

Now let’s look at the update data capture scenario. The attributes captured by CDC source would be the document id and the updated values. So if you want to get the whole document related to the update event then you have to retrieve it using the document id using another query. The following code segment shows how the source for the update operation is defined.

@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'your_username', password = 'your_password', table.name = 'stocks', operation = 'update',                               @map(type = 'keyvalue', @attributes(id = 'id', name = 'name', volume = 'volume') ))                                                 define stream UpdateStream (id string, name string, volume int);

The relevant query used to update the name and the volume would be as follows.

db.stocks.update({name:"stock001"}, {$set:{name:"stock-001", volume:NumberInt(50)}});

We can log the event details and get a log message as follows.

INFO {io.siddhi.core.stream.output.sink.LogSink} — CDCApp : LogUpdateStream : Event{timestamp=1580989706118, data=[5e3be50798c67653c0d75cf0, stock-001, 50], isExpired=false}

Capture Delete Operations

The attributes captured by CDC source would be only the deleted document id. The following code segment shows how the source for the delete operation is defined.

@source(type = 'cdc', url = 'jdbc:mongodb://mongo01/127.0.0.1:27017/StocksDB', username = 'your_username', password = 'your_password', table.name = 'stocks', operation = 'delete',
@map(type = 'keyvalue', @attributes(id = 'id') ))
define stream DeleteStream (id string);

The relevant query used to delete a document which has a key value pair with {name: “stock-001”} in MongoDB as follows.

db.stocks.deleteOne({name:"stock-001"})

We can log the event details and get a log message as follows.

INFO {io.siddhi.core.stream.output.sink.LogSink} — CDCApp : LogDeleteStream : Event{timestamp=1581054881818, data=[5e3be50798c67653c0d75cf0], isExpired=false}

Conclusion

In this article I have described how to use CDC with MongoDB for Streaming Integrator to capture data changes in a data source. For more information about the extension refer to the documentation here, if you find any issues or require improvements please create GitHub issues.

--

--

Prabod Dunuwila

Former Software Engineering Intern @ WSO2 | MIT Undergraduate @ University of Kelaniya, Sri Lanka.