Siddhi MongoDB connector optimized to increase performance upto 4 times.

Prabod Dunuwila
siddhi-io
Published in
7 min readJan 6, 2020

--

Siddhi is a cloud native Streaming and Complex Event Processing engine which can understand streaming SQL queries, process events to identify complex conditions and publish output to various endpoints in real time.

Why we need databases in stream processing?

When processing streams of data, some use cases need to retrieve data stored in databases to enrich the events and to perform decision making operations. Siddhi has an ‘Event Table’ interface which can be used to fetch data from databases and to execute database queries within the stream processing pipeline.

For example, if we want to identify customer purchase patterns over some criteria, we will need past customer purchase details along with the real time purchase events. Since we cannot store all the records in memory there is the need for storing them in databases, and retrieving only the records that are necessary based on the input. Siddhi can retrieve records from the database tables using the `join` query and use those in the real time streams for real time analytics.

Siddhi has support to connect with traditional RDBMS such as MySQL, Postgres, MSSQL and Oracle, as well as NoSQL Databases such as MongoDB. Though Siddhi RDBMS extension is optimized for data retrieval, the Siddhi MongoDB extension, siddhi-store-mongodb (v2.0.3), was having some drawbacks, where it can only perform filtering (on condition) in the database and performs rest of the processing in memory. This produced higher latency, as all the records that satisfy the `on condition` are loaded to find the aggregations.

With the new implementation of siddhi-store-mongodb (v2.1.0), it will now convert the whole Siddhi query to a MongoDB query and pass it to the DB, minimizing data retrieval. Here, as it performs group by, having, limit, offset and attributes transformations and summarizations at the DB level than performing them in-memory, it drastically improves the data retrieval and processing performance.

The previous mongoDB extension uses the AbstractRecordTable interface of Siddhi table which has the `find()` function through which the `on` condition ($match) can be passed to underlying DB. Then, Siddhi processes the queried events by performing group by, having, limit, and offset as well as final stream attributes transformations in-memory.

The new mongoDB extension now uses AbstractQeuryableRecordTable interface of Siddhi tables. In the new implementation which has the functions such as `compileSelection()` and `query()` through which Siddhi query is now converted to Mongo query ( $project, $group, $match, $sort, $limit, $skip ) and passed to DB itself for processing.

Let’s analyze the performance improvement using a simple use case.

Sample Use Case

Consider we have a collection in the MongoDB system which records customer transactions, and we have to query the top 5 customers in terms of their total purchases for different countries.

The MongoDB store defined in Siddhi App will be as follows.

@Store(type = “mongodb” , mongodb.uri=”mongodb://localhos/sampleDB”)
@PrimaryKey(“purchaseId”)
define table Purchase (purchaseId string, customerId string, country string, totalAmount float);

By default, `Purchase` will be an empty in-memory table, and since we have added the `@store` annotation, this table will be now mirroring the MongoDB collection running on `mongodb://localhost/sampleDB`.

We are using a TriggerStream which is configured with an HTTP source via `@source` annotation to trigger the join process.

@source(type=’http’, receiver.url=’http://localhost:5005/trigger', @map(type = ‘json’))
define stream TriggerStream (country string);

The actual join query used to get top 5 customers is as follows.

from TriggerStream as T join Purchase as P
on P.country == T.country
select P.customerId as customer, sum(P.totalAmount) as totalAmount
group by P.customerId
order by totalAmount desc
limit 5
insert into OutputStream;

Here, the `on` condition is used to filter out matching records from the table, based on the `country` value sent through HTTP request. Then, a `group by` is done using `customerId` of the table, and the output is `order by` in descending order using `totalAmount` and finally only the top 5 records are emitted using `limit` into the `OutputStream`. The final MongoDB query that will be generated is as follows.

db.Purchase.aggregate(
{ $match : { country : { $eq : "country_x" } } } ,
{ $group : { _id : '$customerId' , customer : { $last : '$customerId' } , totalAmount : { $sum : '$totalAmount' } } } ,
{ $project : { _id : 0, customer : 1, totalAmount : 1 } } ,
{ "$sort" : { "totalAmount" : -1 } } ,
{ "$limit" : 5 }
)

You can get a Siddhi App used for testing from here.

Performance Evaluation

Performance evaluation was done by running the discussed query when table contains 100, 1,000, 10,000, 25,000, 50,000, 75,000 and 100,000 records.

The tests were carried out in 4 CPU, 16 Gb machine against MongoDB version 4.2.0.

The testing was done against two different data sets. As Siddhi already supports filtering out documents using `on` condition at the DB level, the first data set matches against all the documents when the `on` condition is executed, while the second data set only matches against 1/3 of the documents.

  1. When all of the documents in the collection are filtered by the `on` condition.

The following graphs denote the query processing latency against the number of unique documents present in MongoDB. Here, the red line denotes the latency after the optimization, and the blue line denotes the latency before the optimization.

Case 1 : Latency when running the desired query for only one time.

The latency when processing 100,000 documents reduced from 783ms to 184ms, giving a 76% latency improvement (4.25 times performance improvement). For processing 75,000 documents, the latency reduced from 634ms to 153ms, giving a latency improvement of 76% (4.14 times performance improvement), and for 50,000 documents latency is reduced from 512ms to 106ms, giving a latency improvement of 79% (4.8 times performance improvement).

Case 2 : Latency average by running desired query 10,000 (MongoDB data caching in used).

The latency of processing 100,000 documents reduced from 271.49ms to 123.13ms, giving a 54% latency improvement (2.20 times performance improvement ). For processing 75,000 documents latency reduced from 197.23ms to 92.49ms, giving a latency improvement of 53% (2.13 times performance improvement), and for 50,000 documents latency reduced from 133.83ms to 62.46ms, giving a latency improvement of 53% (2.14 times performance improvement).

Comparing performance improvements of case 1 and case 2 in a single graph will look as below.

2. When 1 out of 3 documents are filtered from the collection using the `on` condition.

Case 1 : Latency when running the desired query for only one time.

The latency of processing 100,000 documents reduced from 448ms to 124ms, giving a 72% latency improvement (3.6 times performance improvement ). For processing 75,000 documents latency reduced from 367 ms to 109ms, giving a latency improvement of 70% (3.4 times performance improvement) and for 50,000 documents latency reduced from 304ms to 71ms, giving a latency improvement of 77% (4.3 times performance improvement).

Case 2 : Latency average by running desired query 10,000 (MongoDB data caching in used).

The latency for processing 100,000 documents reduced from 121.07 ms to 66.07 ms, giving a 45% latency improvement (1.83 times performance improvement ). For processing 75,000 documents latency reduced from 97.64 ms to 51.69 ms, giving a latency improvement of 47% (1.8 times performance improvement) and for 50,000 documents latency reduced from 63.65 ms to 34.07 ms, giving a latency improvement of 46% (1.9 times performance improvement).

Comparing performance improvements of case 1 and case 2 in a single graph will look as below.

Conclusion

The new MongoDB extension, siddhi-store-mongodb v2.1.0, improves database query performance up to 4 times without caching, and 2 times with caching in MongoDB, when retrieving and processing data from MongoDB data store.

Therefore, with the improvements, when users use MongoDB join queries and on-demand queries, they can achieve upto 4 times performance and reduced query processing latency, while improving overall Siddhi App performance to a great extent.

For more information about the extension refer to the documentation here, if you found any issues or require improvements please create GitHub issues, and feel free to connect to the development community via the Siddhi slack channel.

--

--

Prabod Dunuwila
siddhi-io

Former Software Engineering Intern @ WSO2 | MIT Undergraduate @ University of Kelaniya, Sri Lanka.