Using MongoDB Change Streams for Indexing with Elasticsearch vs Rockset

May 6, 2020

,

The ability to get the changes that happen in an operational database like MongoDB and make them available for real-time applications is a core capability for many organizations. Change Data Capture (CDC) is one such approach to monitoring and capturing events in a system. Wikipedia describes CDC as “a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. CDC is an approach to data integration that is based on the identification, capture and delivery of the changes made to enterprise data sources.“ Businesses use CDC from operational databases to power real-time applications and various microservices that demand low data latency, examples of which include fraud prevention systems, game leaderboard APIs, and personalized recommendation APIs. In the MongoDB context, change streams offer a way to use CDC with MongoDB data.

Organizations will often index the data in MongoDB by pairing MongoDB with another database. This serves to separate operational workloads from the read-heavy access patterns of real-time applications. Users get the added benefit of improved query performance when their queries can make use of the indexing of the second database.

Elasticsearch is a common choice for indexing MongoDB data, and users can use change streams to effect a real-time sync from MongoDB to Elasticsearch. Rockset, a real-time indexing database in the cloud, is another external indexing option which makes it easy for users to extract results from their MongoDB change streams and power real-time applications with low data latency requirements.

Rockset Patch API

Rockset recently introduced a Patch API method, which enables users to stream complex CDC changes to Rockset with low-latency inserts and updates that trigger incremental indexing, rather than a complete reindexing of the document. In this blog, I’ll discuss the benefits of Patch API and how Rockset makes it easy to use. I’ll also cover how Rockset uses it internally to capture changes from MongoDB.

Updating JSON data in a document data model is more complicated than updating relational data. In a relational database world, updating a column is fairly straightforward, requiring the user to specify the rows to be updated and a new value for every column that needs to be updated on those rows. But this is not true for applications dealing with JSON data, which might need to update nested objects and elements within nested arrays, or append a new element at a particular point within a nested array. Keeping all these complexities in mind, Rockset’s Patch API to update existing documents is based on JSON Patch (RFC-6902), a web standard for describing changes in a JSON document.

Updates Using Patch API vs Updates in Elasticsearch

Rockset is a real-time indexing database specifically built to sync data from other sources, like MongoDB, and automatically build indexes on your documents. All documents stored in a Rockset collection are mutable and can be updated at the field level, even if these fields are deeply nested inside arrays and objects. Taking advantage of these characteristics, the Patch API was implemented to support incremental indexing. This means updates only reindex those fields in a document that are part of the patch request, while keeping the rest of the fields in the document untouched.

In contrast, when using Elasticsearch, updating any field will trigger a reindexing of the entire document. Elasticsearch documents are immutable, so any update requires a new document to be indexed and the old version marked deleted. This results in additional compute and I/O expended to reindex even the unchanged fields and to write entire documents upon update. For an update to a 10-byte field in a 10KB document, reindexing the entire document would be ~1,000x less efficient than updating the single field alone, like Rockset’s Patch API enables. Processing a large number of updates can have an adverse effect on Elasticsearch system performance because of this reindexing overhead.

For the purpose of keeping in sync with updates coming via MongoDB change streams, or any database CDC stream, Rockset can be orders of magnitude more efficient with compute and I/O compared to Elasticsearch. Patch API provides users a way to take advantage of efficient updates and incremental indexing in Rockset.

Patch API Operations

Patch API in Rockset supports the following operations:

  • add - Add a value into an object or array
  • remove - Remove a value from an object or array
  • replace - Replaces a value. Equivalent to a "REMOVE" followed by an "ADD".
  • test - Tests that the specified value is set in the document at a certain path.

Patch operations for a document are specified using the following three fields:

  • “op”: One of the patch operations listed above
  • “path”: Path to field in document that needs to be updated. The path is specified using a string of tokens separated by / . Path starts with / and is relative to the root of the document.
  • “value”: Optional field to specify the new value.

Every document in a Rockset collection is uniquely identified by its _id field and is used along with patch operations to construct the request. An array of operations specified for a document is applied in order and atomically in Rockset. If one of them fails, the entire patch operation for that document fails. This is important for applying patches to the correct document, as we will see next.

How to Use Patch API

Now I will walkthrough an example on how to use the Patch API using Rockset’s python client. Consider the following two documents present in a Rockset collection named “FunWithAnimals”:

{
  "_id": "mammals",
  "animals": [
    { "name": "Dog" },
    { "name": "Cat" }
  ]
},
{
  "_id": "reptiles",
  "animals": [
    { "name": "Snake" },
    { "name": "Alligator"}
  ]
}

Now let’s say I want to remove a name from the list of mammals and also add another one to the list. To insert Horse at the end of the array (index 2), I have to provide path /animals/2. Also to remove Dog from index 0, path /animals/0 is provided. Similarly, I would like to add another name in the list of reptiles as well. - character can also be used to indicate end of an array. Thus, to insert Lizard at end of array I’ll use the path /animals/-.

Using Rockset’s python client, you can apply this patch like below:

from rockset import Client
rs = Client()
c = rs.Collection.retrieve('FunWithAnimals')

mammal_patch = {
    "_id": "mammals",
    "patch": [
{ "op": "add", "path": "/animals/2", "value": {"name": "Horse"} },
{ "op": "remove", "path": "/animals/0" }
    ]
}

reptile_patch = {
    "_id": "reptiles",
     "patch": [
  { "op": "add", "path": "/animals/-", "value": {"name": "Lizard"} }
     ]   
}

c.patch_docs([mammal_patch, reptile_patch])

If the command is successful, Rockset returns a list of document status records, one for each input document. Each status contains a patch_id which can be used to check if patch was applied successfully or not (more on this later).

[{'collection': 'FunWithAnimals',
 'error': None,
 'id': 'mammals',
 'patch_id': 'b59704c1-30a0-4118-8c35-6cbdeb44dca8',
 'status': 'PATCHED'
},
{'collection': 'FunWithAnimals',
 'error': None,
 'id': 'reptiles',
 'patch_id': '5bc0696a-d7a0-43c8-820a-94f851b69d70',
 'status': 'PATCHED'
}]

Once the above patch request is successfully processed by Rockset, the new documents will look like this:

{
  "_id": "mammals",
  "animals": [
    { "name": "Cat" },
    { "name": "Horse" }
  ]
},
{
  "_id": "reptiles",
  "animals": [
    { "name": "Snake" },
    { "name": "Alligator"},
    { "name": "Lizard"}
  ]
}

Next, I would like to replace Alligator with Crocodile if Alligator is present at array index 1. For this I will use test and replace operations:

reptile_patch = {
    "_id": "reptiles",
     "patch": [
          { "op": "test", "path": "/animals/1", "value": {"name": "Alligator"} },
          { "op": "replace", "path": "/animals/1", "value": {"name": "Crocodile"} }
     ]   
}

c.patch_docs([reptile_patch])

After the patch is applied, document will look like below.

{
  "_id": "reptiles",
  "animals": [
    { "name": "Snake" },
    { "name": "Crocodile"},
    { "name": "Lizard"}
  ]
}

As I mentioned before, the list of operations specified for a document is applied in order and atomically in Rockset. Let’s see how this works. I will use the same example above (replacing Crocodile with Alligator) but instead of using test for path /animals/1 I will supply /animals/2.

reptile_patch = {
    "_id": "reptiles",
     "patch": [
          { "op": "test", "path": "/animals/2", "value": {"name": "Crocodile"} },
          { "op": "replace", "path": "/animals/1", "value": {"name": "Alligator"} }
     ]
}

c.patch_docs([reptile_patch])

The above patch fails and no updates are done. To see why it failed, we will need to query _events system collection in Rockset and look for the patch_id.

from rockset import Client, Q, F
rs = Client()
q = Q('_events', alias='e')
    .select(F['e']['message'], F['e']['label'])
    .where(F['e']['details']['patch_id'] == 'adf7fb54-9410-4212-af99-ec796e906abc'
)
result = rs.sql(q)
print(result)

Output:

[{'message': 'Patch value does not match at `/animals/2`', 'label': 'PATCH_FAILED'}]

The above patch failed because the value did not match at array index 2 as expected and the next replace operation wasn’t applied, guaranteeing atomicity.

Capturing Change Events from MongoDB Atlas Using Patch API

MongoDB Atlas provides change streams to capture table activity, enabling these changes to be loaded into another table or replica to serve real-time applications. Rockset uses Patch API internally on MongoDB change streams to update records in Rockset collections.

mongodb rockset patch api

MongoDB change streams allow users to subscribe to real-time data changes against a collection, database, or deployment. For Rockset-MongoDB integration, we configure a change stream against a collection to only return the delta of fields during the update operation (default behavior). As each new event comes in for an update operation, Rockset constructs the patch request using the updatedFields and removedFields keys to index them in an existing document in Rockset. MongoDB’s _id field is mapped to Rockset’s _id field to ensure updates are applied to the correct document. Change streams can also be configured to return the full new updated document instead of the delta, but reindexing everything can result in increased data latencies, as discussed before.

An update operation on a document in MongoDB produces an event like below (using the same example as before).

{
   "_id" : { <BSON Object> },
   "operationType" : "update",
   ...
   "updateDescription" : {
      "updateDescription" : {
        "updatedFields" : {
            "animals.2" : {
                "name" : "Horse"
            }
        },
        "removedFields" : [ ]
    },
   ...
   "clusterTime" : <Timestamp>,
   ...
}

Rockset’s Patch API for the above CDC event will look like:

mongodb_patch = {
    "_id": "<serialized _id>",
    "patch": [
        { "op": "add", "path": "/animals/2", "value": {"name": "Horse"} }
    ]
}

The _id in the CDC event is serialized as a string to map to _id in Rockset.

The connector from MongoDB to Rockset will handle creating the patch from the MongoDB update, so the use of the Patch API for CDC from MongoDB is transparent to the user. Rockset will write only the specific updated field, without requiring a reindex of the entire document, making it efficient to perform fast ingest from MongoDB change streams.

Summary

With increasing data volumes, businesses are continuously looking for ways to cut down processing time for real-time applications. Using a CDC mechanism in conjunction with an indexing database is a common approach to doing so. Rockset offers a fully managed indexing solution for MongoDB data that requires no sizing, provisioning, or management of indexes, unlike an alternative like Elasticsearch.

Rockset provides the Patch API, which makes it simple for users to propagate changes from MongoDB, or other databases or event streams, to Rockset using a well-defined JSON patch web standard. Using Patch API, Rockset provides lower data latency on updates, making it efficient to perform fast ingest from MongoDB change streams, without the requirement to reindex entire documents. Patch API is available in Rockset as a REST API and also as part of different language clients.



Other MongoDB and Elasticsearch resources: