MongoDB Master-Slave Replication by Change Stream
By the official manual:
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.
Here we leverage change stream to replicate data from one MongoDB to another in realtime.
There is some existing tools such as MongoShake do the same thing. However, MongoShake is a little bit complicated to use. We encoutered two issues:
- Modify the source code to use TLS authentication
- Cannot perform increment sync in
all
sync_mode
Since our goal is realtime replication, we choose a more straightforward and controllable way: MongoDB Ops Manager cluster restore and change stream to apply realtime changes.
Notes: we use master
and
slave
to represent the source and the target MongoDB for
simplicity. They are not formal MongoDB concepts. They are also
different from primary
and secondary
( Replication
).
Our replication method can be applied to any MongoDB architectures (including both source and target): standalone, replica set and sharded cluster.
The replication process:
- Restore a recent source snapshot to the target cluster
- Set the start point before the snapshot timestamp and start the realtime replication program
- Wait the target to keep up with the source (small replication lag)
The change stream interface is pretty simple. For instance, use the following code to watch changes in all collections in all databases (refer to this ):
using (var cursor = await client.WatchAsync())
{
await cursor.ForEachAsync(change =>
{
// process change event
});
}
So the realtime replication needs to solve the following problems:
- How to setup the change stream start point on the source cluster
- How to replay the change to the target cluster
How to construct startAtOperationTime
According to the manual:
Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.
However, the resumeAfter
or startAfter
must
be retrieved from the last change stream. So how to set the start point
before now?
We finally found the solution: use StartAtOperationTime
in the ChangeStreamOptions
to specify the start time like
this:
var options = new ChangeStreamOptions
{
BatchSize = 2000,
StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
BsonTimestamp
definition:
public BsonTimestamp(
int timestamp,
int increment
)
How to replay the change
Note that another ChangeStreamOptions
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
means the change's FullDocument
property will contains the
full updated documents for update operations. We use this property to
replay the update change to the target.
Replay the change needs to handle different change types, our implementation refer to this .
ChangeStreamOperationType
definition:
public enum ChangeStreamOperationType
{
Insert = 0,
Update = 1,
Replace = 2,
Delete = 3,
Invalidate = 4,
Rename = 5,
Drop = 6
}
Code to handle these types:
if (change.OperationType == ChangeStreamOperationType.Insert)
{
await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Delete)
{
var id = change.DocumentKey.GetValue("_id").ToString();
var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
{
var id = change.FullDocument.GetValue("_id").ToString();
var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
}
else
{
Console.WriteLine($"Unknown type: [{change.OperationType}]");
}
Actually, we only process insert, delete, replace and update
operations. Moreover, replace and update operations are handled by the
same logic as we leverage the
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
option.
Sometimes the FullDocument
is null for the update
operation. This is caused by document deletion. Refer to this.
Full Code
See this repo: https://github.com/finisky/MongoDBSync
Implementation notes:
- Use transaction to guarantee replay operation success
- NOT use bulkwrite for simplicity
- Make sure your source MongoDB version >= v4.4.4, see also: # MongoDB Change Stream Makes the Cluster Down
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var sourceConnString = "";
var targetConnString = "";
var sourceClientSettings = MongoClientSettings.FromConnectionString(sourceConnString);
var targetClientSettings = MongoClientSettings.FromConnectionString(targetConnString);
var sourceClient = new MongoClient(sourceClientSettings);
var targetClient = new MongoClient(targetClientSettings);
var startPoint = DateTimeOffset.UtcNow.AddHours(-10);
SyncDb(sourceClient, targetClient, startPoint).Wait();
Console.ReadLine();
}
private static long DelaySeconds(int clusterTimestamp)
{
return DateTimeOffset.UtcNow.ToUnixTimeSeconds() - clusterTimestamp;
}
private static async Task SyncDb(IMongoClient sourceClient, IMongoClient targetClient, DateTimeOffset startPoint)
{
var options = new ChangeStreamOptions
{
BatchSize = 2000,
StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
using (var cursor = await sourceClient.WatchAsync(options))
{
long count = 0;
while (await cursor.MoveNextAsync())
{
if (!cursor.Current.Any())
{
Console.WriteLine("No changes, skip...");
continue;
}
var token = cursor.GetResumeToken();
var headDoc = cursor.Current.First();
Console.WriteLine($"[{DelaySeconds(headDoc.ClusterTime.Timestamp)}s] [{count}] Token: {token} ----------------------------");
foreach (var change in cursor.Current)
{
++count;
await ReplayChangeToTarget(targetClient, change);
}
}
}
}
private static async Task ReplayChangeToTarget(IMongoClient targetClient, ChangeStreamDocument<BsonDocument> change)
{
var dbName = change.CollectionNamespace.DatabaseNamespace.ToString();
var collectionName = change.CollectionNamespace.CollectionName.ToString();
var collection = targetClient.GetDatabase(dbName).GetCollection<BsonDocument>(collectionName);
// Uncomment the following two lines to print changes without replaying to the target
//Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] {change.CollectionNamespace.FullName}");
//return;
try
{
using (var session = await targetClient.StartSessionAsync())
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)))
{
await session.WithTransactionAsync(
async (s, ct) =>
{
if (change.OperationType == ChangeStreamOperationType.Insert)
{
await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Delete)
{
var id = change.DocumentKey.GetValue("_id").ToString();
var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
{
var id = change.FullDocument.GetValue("_id").ToString();
var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
}
else
{
Console.WriteLine($"Unknown type: [{change.OperationType}]");
}
return string.Empty;
}, cancellationToken: cts.Token);
}
}
catch (MongoWriteException ex)
{
if (ex.WriteError.Code == 11000)
{
Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] Dup key: {change.FullDocument.GetValue("_id")}, ignore...");
}
else
{
Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] MongoWriteException: {change.ToJson()}");
}
}
catch (Exception ex)
{
Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] Exception [{change.DocumentKey.ToJson()}] [{ex.GetType()}]: {ex.Message}");
}
}
}
}
Other references:
- https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
- https://vinta.ws/code/mongodb-change-stream-react-to-real-time-data-changes.html
- https://medium.com/expedia-group-tech/mongo-change-streams-in-production-97a07c7c0420