MongoDB使用Change Stream主从复制

Change stream是什么?官方文档:

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

这里我们利用change stream来做实时的主从复制。网上没有找到相应的方案,想必是因为直接的做法可能是通过replica set来完成,不会手动进行主从复制。但业务层是有这样的需求的,比如跨地区的异构集群数据备份。

已有的轮子只找到了 MongoShake 。但MongoShake毕竟不是商业项目,代码拉下来运行时发现并不能在我们的环境中正常工作:

  • TLS验证有些问题,通过修改源码解决了
  • all同步模式下,只能通过oplog进行了全量复制,在用change stream进行增量复制时不停抛错,无法正常运行

考虑到改轮子可能比造个轮子更费劲,就研究了下如何自己做主从复制。最简单的原理就是从源库实时地读oplog,然后在目标库上重放oplog。说起来简单,但实现起来可能没那么容易,尤其在源库是分片集群时,不能直接用mongos拉oplog,而要手动从不同的shard上拉取数据,实现难度较高。

好消息是在MongoDB v3.6之后有了change stream功能,再加上我们使用MongoDB Ops Manager做分片集群的管理,可以轻松地做快照恢复,那么主从复制要做的就是从快照时间点之后重放实时的改动。

看起来这轮子自己能造。

贴士: 我们这里用masterslave描述主库和从库,但这并不是MongoDB的概念。primarysecondary ( Replication ) 才是MongoDB的概念。这里的主库(源库)和从库(目标库)都可能是单机,复制集和分片集群。因为change stream对各种MongoDB架构都适用,因此下面的代码对主库和从库也没有架构上的要求,都可适用。

主从复制的过程:

  • 使用MongoDB Ops Manager恢复最近的源库快照到目标库
  • 设置实时重放的起始时间早于快照时间,并启动重放
  • 等待重放进程赶上进度

Change stream的接口非常简单,用下面的代码就可以watch所有DB上所有Collection的改变( 参考 这里 ):

using (var cursor = await client.WatchAsync())
{
    await cursor.ForEachAsync(change =>
    {
        // process change event
    });
}

那么主从复制需要解决下面两个问题:

  • 在源库上设置开始watch change的时间点
  • 在目标库上重放change

构造startAtOperationTime

根据 手册:

Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.

但尴尬的是resumeAfterstartAfter这两个token只能通过上一次的change获取。问题在于如何冷启动?难道只能在当前时间之后进行主从复制?

一番研究之后发现了方案,可以在ChangeStreamOptions中使用StartAtOperationTime指定开始时间,StartAtOperationTime是个BsonTimestamp:

var options = new ChangeStreamOptions
{
	BatchSize = 2000,
	StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
	FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};

BsonTimestamp 定义:

public BsonTimestamp(
	int timestamp,
	int increment
)

这里还有个选项FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,代表update操作会将更新后的完整文档返回,我们可以利用这个文档在目标库上进行重放。

在目标库上重放change

我们仅需要在目标库上重放源库的写操作,而对写操作的不同类型要有不同的处理方式,参考了这个go的实现

ChangeStreamOperationType 类型:

public enum ChangeStreamOperationType
{
	Insert = 0,
	Update = 1,
	Replace = 2,
	Delete = 3,
	Invalidate = 4,
	Rename = 5,
	Drop = 6
}

我们只处理了insert,delete,replace和update四种类型,原因在于其他三种类型并不常见,可以通过手动操作完成,也没想到很好的处理方式。主要的处理逻辑如下:

if (change.OperationType == ChangeStreamOperationType.Insert)
{
	await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Delete)
{
	var id = change.DocumentKey.GetValue("_id").ToString();
	var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
	await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
}
else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
{
	var id = change.FullDocument.GetValue("_id").ToString();
	var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
	await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
}
else
{
	Console.WriteLine($"Unknown type: [{change.OperationType}]");
}

这里update和replace操作的处理方式是一样的,其中update是利用了上文提到的FullDocument属性来获取更新后的文档。

运行这段代码时发现对于update操作,FullDocument有时会是空,这种情况说明文档在更新之后被删除了,这里的实现没有做特殊处理。参考 这里.

完整代码

代码在这个repo: https://github.com/finisky/MongoDBSync

实现的一些细节:

  • 使用transaction保证重放的成功
  • 没有使用bulkwrite,因为不同的change可能对应不同的DB和collection,实现逻辑复杂,而且在bulk中的某一个操作失败之后不易进行错误处理
  • 确认源库MongoDB版本在4.4.4版本(含)以上,否则change stream可能会让源库不停重启,参考 # MongoDB Change Stream Makes the Cluster Down
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
			var sourceConnString = "";
			var targetConnString = "";

			var sourceClientSettings = MongoClientSettings.FromConnectionString(sourceConnString);
			var targetClientSettings = MongoClientSettings.FromConnectionString(targetConnString);

			var sourceClient = new MongoClient(sourceClientSettings);
			var targetClient = new MongoClient(targetClientSettings);
			var startPoint = DateTimeOffset.UtcNow.AddHours(-10);

			SyncDb(sourceClient, targetClient, startPoint).Wait();

			Console.ReadLine();
        }

		private static long DelaySeconds(int clusterTimestamp)
		{
			return DateTimeOffset.UtcNow.ToUnixTimeSeconds() - clusterTimestamp;
		}

		private static async Task SyncDb(IMongoClient sourceClient, IMongoClient targetClient, DateTimeOffset startPoint)
		{
			var options = new ChangeStreamOptions
			{
				BatchSize = 2000,
				StartAtOperationTime = new BsonTimestamp((int)startPoint.ToUnixTimeSeconds(), 1),
				FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
			};

			using (var cursor = await sourceClient.WatchAsync(options))
			{
				long count = 0;
				while (await cursor.MoveNextAsync())
				{
					if (!cursor.Current.Any())
					{
						Console.WriteLine("No changes, skip...");
						continue;
					}

					var token = cursor.GetResumeToken();
					var headDoc = cursor.Current.First();
					Console.WriteLine($"[{DelaySeconds(headDoc.ClusterTime.Timestamp)}s] [{count}] Token: {token} ----------------------------");

					foreach (var change in cursor.Current)
					{
						++count;
						await ReplayChangeToTarget(targetClient, change);
					}
				}
			}
		}

		private static async Task ReplayChangeToTarget(IMongoClient targetClient, ChangeStreamDocument<BsonDocument> change)
        {
			var dbName = change.CollectionNamespace.DatabaseNamespace.ToString();
			var collectionName = change.CollectionNamespace.CollectionName.ToString();
			var collection = targetClient.GetDatabase(dbName).GetCollection<BsonDocument>(collectionName);

            // Uncomment the following two lines to print changes without replaying to the target
			//Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] {change.CollectionNamespace.FullName}");
			//return;
			
			try
			{
				using (var session = await targetClient.StartSessionAsync())
				using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)))
				{
					await session.WithTransactionAsync(
						async (s, ct) =>
						{
							if (change.OperationType == ChangeStreamOperationType.Insert)
							{
								await collection.InsertOneAsync(s, change.FullDocument, cancellationToken: ct);
							}
							else if (change.OperationType == ChangeStreamOperationType.Delete)
							{
								var id = change.DocumentKey.GetValue("_id").ToString();
								var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
								await collection.DeleteOneAsync(s, filter, cancellationToken: ct);
							}
							else if (change.OperationType == ChangeStreamOperationType.Update || change.OperationType == ChangeStreamOperationType.Replace)
							{
								var id = change.FullDocument.GetValue("_id").ToString();
								var filter = Builders<BsonDocument>.Filter.Eq("_id", id);
								await collection.ReplaceOneAsync(s, filter, change.FullDocument, cancellationToken: ct);
							}
							else
							{
								Console.WriteLine($"Unknown type: [{change.OperationType}]");
							}

							return string.Empty;
						}, cancellationToken: cts.Token);
				}
			}
			catch (MongoWriteException ex)
			{
				if (ex.WriteError.Code == 11000)
				{
					Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] Dup key: {change.FullDocument.GetValue("_id")}, ignore...");
				}
				else
				{
					Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] MongoWriteException: {change.ToJson()}");
				}
			}
			catch (Exception ex)
			{
				Console.WriteLine($"[{DelaySeconds(change.ClusterTime.Timestamp)}s] [# {change.OperationType}] Exception [{change.DocumentKey.ToJson()}] [{ex.GetType()}]: {ex.Message}");
			}
		}
	}
}

参考:

  • https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change
  • https://vinta.ws/code/mongodb-change-stream-react-to-real-time-data-changes.html
  • https://medium.com/expedia-group-tech/mongo-change-streams-in-production-97a07c7c0420