MongoDB BulkWrite无限重试问题解决

之前我们谈到 # MongoDB事务重试实现. 如果在事务中使用了BulkWrite(),那么这个新的事务API可能会无限重试从而导致服务器CPU使用率100% (MongoDB Server v4.4.6-ent, MongoDB Driver v2.12.2)。

为避免这个问题,有三个客户端实现的建议:

  • 事务API传入cancellation token,限制事务的最长执行时间
  • 超过最大重试次数后强制退出事务,避免无限重试
  • 设置BulkWrite()按序执行

其中前两个建议对所有事务实现都建议使用,可避免极端情况下对服务端造成不必要的负载。

事务最长执行时间限制

首先,可以给之前的例子RetryReplaceAsync()函数加上Cancellation Token,限制该事务的最长执行时间为1秒:

private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
	var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

	using (var session = await mongoClient.StartSessionAsync())
	using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)))
	{
		var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

		await session.WithTransactionAsync(
			async (s, ct) =>
			{
				await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
				return string.Empty;
			},  cancellationToken: cts.Token);
	}
}

事务最大重试次数限制

加三行代码即可,这里最大重试次数为3:

private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
    var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

    int count = 0;
    using (var session = await mongoClient.StartSessionAsync())
    using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)))
    {
        var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

        await session.WithTransactionAsync(
            async (s, ct) =>
            {
                if (++count >= 3)
                {
                    throw new ApplicationException($"Reached max retry times");
                }
                await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
                return string.Empty;
            },  cancellationToken: cts.Token);
    }
}

设置BulkWrite按序执行

BulkWrite可以按序执行也可乱序执行,默认是按序执行。# db.collection.bulkWrite()

从性能考虑,显然乱序执行好一些,但乱序执行时对单个操作的某些不可重试异常可能会被吃掉(比如Duplicate Key Error),从而会导致整个事务被无限重试。

因此,安全考虑,建议使用按序执行,其实性能也差不了多少,示例代码如下:

await collection.BulkWriteAsync(session, listWrites, new BulkWriteOptions { IsOrdered = true }, cancellationToken: ct);

Reference

https://developer.mongodb.com/community/forums/t/handling-duplicated-key-error-in-bulk-insert-retry-scenarios/2869

https://stackoverflow.com/questions/61244296/how-to-handle-duplicate-error-in-mongo-bulkinsert-retry