MongoDB Master-Slave Replication by Change Stream

By the official manual:

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Here we leverage change stream to replicate data from one MongoDB to another in realtime.

There is some existing tools such as MongoShake do the same thing. However, MongoShake is a little bit complicated to use. We encoutered two issues:

  • Modify the source code to use TLS authentication
  • Cannot perform increment sync in all sync_mode

Since our goal is realtime replication, we choose a more straightforward and controllable way: MongoDB Ops Manager cluster restore and change stream to apply realtime changes.

Notes: we use master and slave to represent the source and the target MongoDB for simplicity. They are not formal MongoDB concepts. They are also different from primary and secondary ( Replication ).

Our replication method can be applied to any MongoDB architectures (including both source and target): standalone, replica set and sharded cluster.

The replication process:

  • Restore a recent source snapshot to the target cluster
  • Set the start point before the snapshot timestamp and start the realtime replication program
  • Wait the target to keep up with the source (small replication lag)

The change stream interface is pretty simple. For instance, use the following code to watch changes in all collections in all databases (refer to this ):

using (var cursor = await client.WatchAsync())
{
    await cursor.ForEachAsync(change =>
    {
        // process change event
    });
}

So the realtime replication needs to solve the following problems:

  • How to setup the change stream start point on the source cluster
  • How to replay the change to the target cluster

How to construct startAtOperationTime

According to the manual:

Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.

However, the resumeAfter or startAfter must be retrieved from the last change stream. So how to set the start point before now?

We finally found the solution: use StartAtOperationTime in the ChangeStreamOptions to specify the start time like this:

var options = new ChangeStreamOptions
{
	BatchSize = 2000,
	StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
	FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};

BsonTimestamp definition:

public BsonTimestamp(
	int timestamp,
	int increment
)

How to replay the change

Note that another ChangeStreamOptions FullDocument = ChangeStreamFullDocumentOption.UpdateLookup means the change's FullDocument property will contains the full updated documents for update operations. We use this property to replay the update change to the target.

Replay the change needs to handle different change types, our implementation refer to this .

ChangeStreamOperationType definition:

public enum ChangeStreamOperationType
{
	Insert = 0,
	Update = 1,
	Replace = 2,
	Delete = 3,
	Invalidate = 4,
	Rename = 5,
	Drop = 6
}

Code to handle these types:

if (change.OperationType == ChangeStreamOperationType.Insert)
{
	await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Delete)
{
	var id = change.DocumentKey.GetValue("_id").ToString();
	var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
	await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
{
	var id = change.FullDocument.GetValue("_id").ToString();
	var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
	await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
}
else
{
	Console.WriteLine($"Unknown type: [{change.OperationType}]");
}

Actually, we only process insert, delete, replace and update operations. Moreover, replace and update operations are handled by the same logic as we leverage the FullDocument = ChangeStreamFullDocumentOption.UpdateLookup option.

Sometimes the FullDocument is null for the update operation. This is caused by document deletion. Refer to this.

Full Code

See this repo: https://github.com/finisky/MongoDBSync

Implementation notes:

using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
			var sourceConnString = "";
			var targetConnString = "";

			var sourceClientSettings = MongoClientSettings.FromConnectionString(sourceConnString);
			var targetClientSettings = MongoClientSettings.FromConnectionString(targetConnString);

			var sourceClient = new MongoClient(sourceClientSettings);
			var targetClient = new MongoClient(targetClientSettings);
			var startPoint = DateTimeOffset.UtcNow.AddHours(-10);

			SyncDb(sourceClient, targetClient, startPoint).Wait();

			Console.ReadLine();
        }

		private static long DelaySeconds(int clusterTimestamp)
		{
			return DateTimeOffset.UtcNow.ToUnixTimeSeconds() - clusterTimestamp;
		}

		private static async Task SyncDb(IMongoClient sourceClient, IMongoClient targetClient, DateTimeOffset startPoint)
		{
			var options = new ChangeStreamOptions
			{
				BatchSize = 2000,
				StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
				FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
			};

			using (var cursor = await sourceClient.WatchAsync(options))
			{
				long count = 0;
				while (await cursor.MoveNextAsync())
				{
					if (!cursor.Current.Any())
					{
						Console.WriteLine("No changes, skip...");
						continue;
					}

					var token = cursor.GetResumeToken();
					var headDoc = cursor.Current.First();
					Console.WriteLine($"[{DelaySeconds(headDoc.ClusterTime.Timestamp)}s] [{count}] Token: {token} ----------------------------");

					foreach (var change in cursor.Current)
					{
						++count;
						await ReplayChangeToTarget(targetClient, change);
					}
				}
			}
		}

		private static async Task ReplayChangeToTarget(IMongoClient targetClient, ChangeStreamDocument<BsonDocument> change)
        {
			var dbName = change.CollectionNamespace.DatabaseNamespace.ToString();
			var collectionName = change.CollectionNamespace.CollectionName.ToString();
			var collection = targetClient.GetDatabase(dbName).GetCollection<BsonDocument>(collectionName);

            // Uncomment the following two lines to print changes without replaying to the target
			//Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] {change.CollectionNamespace.FullName}");
			//return;
			
			try
			{
				using (var session = await targetClient.StartSessionAsync())
				using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)))
				{
					await session.WithTransactionAsync(
						async (s, ct) =>
						{
							if (change.OperationType == ChangeStreamOperationType.Insert)
							{
								await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
							}
							else if (change.OperationType == ChangeStreamOperationType.Delete)
							{
								var id = change.DocumentKey.GetValue("_id").ToString();
								var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
								await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
							}
							else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
							{
								var id = change.FullDocument.GetValue("_id").ToString();
								var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
								await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
							}
							else
							{
								Console.WriteLine($"Unknown type: [{change.OperationType}]");
							}

							return string.Empty;
						}, cancellationToken: cts.Token);
				}
			}
			catch (MongoWriteException ex)
			{
				if (ex.WriteError.Code == 11000)
				{
					Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] Dup key: {change.FullDocument.GetValue("_id")}, ignore...");
				}
				else
				{
					Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] MongoWriteException: {change.ToJson()}");
				}
			}
			catch (Exception ex)
			{
				Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] Exception [{change.DocumentKey.ToJson()}] [{ex.GetType()}]: {ex.Message}");
			}
		}
	}
}

Other references:

  • https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
  • https://vinta.ws/code/mongodb-change-stream-react-to-real-time-data-changes.html
  • https://medium.com/expedia-group-tech/mongo-change-streams-in-production-97a07c7c0420