MongoDB事务重试实现

MongoDB事务是个很好的功能,但对于高并发场景下的多文档事务,写冲突难以避免。一个写冲突的实例:

Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction..

那么如何正确实现事务的重试?

最直接的想法就是在应用层进行手动重试,一旦发现有异常,就开始重试。但是重试的实现并没有那么简单,要考虑的问题很多:这个异常是什么类型的?它是瞬时异常还是持续异常? 要重试多少次? 如何实现重试机制的同时保证代码的简洁?

好消息是新的MongoDB driver已经帮我们解决了重试的问题。它内置了重试机制:https://docs.mongodb.com/manual/core/transactions/#transactions-api

The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

我们需要做的就是使用新的transaction API来实现事务。但我不太理解的是即使是在官方C# MongoDB driver文档上依然使用的是旧的API,也就是不带重试的版本。所以本文比较了新老API的差异,并给出了使用实例。

重现WriteConflict错误

旧版Transactions API

来自官方文档 C# MongoDB driver documentation:

using (var session = client.StartSession())
{
    session.StartTransaction();
    // execute operations using the session
    session.CommitTransaction(); // if an exception is thrown before reaching here the transaction will be implicitly aborted
}

WriteConflict示例

我们用旧版API实现一个写冲突的例子。创建两个task更新同一个文档,为了让它稳定复现,刻意让第一个task在commit之前等上两秒:

using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;

namespace MongoTest
{
    public class RetryableTransaction
    {
        private const string DatabaseName = "Test";
        private const string CollectionName = "Test";
        public const string ConnectionString = "";

        public async Task WriteConflictAsync()
        {
            var mongoClient = new MongoClient(ConnectionString);

            var uuid = "conflict1";
            await BareReplaceAsync(mongoClient, uuid, "init");

            Console.WriteLine("------ Bare Replace ------");
            var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
            try
            {
                await BareReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine($"[{DateTime.Now}] Completed...");
        }

        private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                session.StartTransaction();

                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
                await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
                await Task.Delay(delay); // Intentionally slow the transaction

                await session.CommitTransactionAsync();
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }
    }
}

WriteConflictAsync() 执行结果:

[2021/2/19 13:35:51] Done [init] ------ Bare Replace ------ [2021/2/19 13:35:51] Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.. [2021/2/19 13:35:53] Done [slow]

显然,第二个更新task因为写冲突失败了。如果我们看看抛出的 MongoDBCommandException, 会发现在这个异常的ErrorLabel里有个TransientTransactionError。也就是说异常本身就告诉我们这是个瞬时错误,可以通过重试解决。

解决方案

应用层手动重试

这篇文章给了一个对TransientTransactionError手动重试的实现方法。但这种方法并不推荐使用,更优雅的做法是用MongoDB driver的重试机制。需要升级driver到最新版本,并且保证server的版本>=4.2(不太确定)?

新的Driver天然集成了重试机制: https://docs.mongodb.com/manual/core/transactions/#transactions-api

The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

WithTransactionAsync 定义:

//
// Summary:
//     Executes a callback within a transaction, with retries if needed.
//
// Parameters:
//   callbackAsync:
//     The user defined callback.
//
//   transactionOptions:
//     The transaction options.
//
//   cancellationToken:
//     The cancellation token.
//
// Type parameters:
//   TResult:
//     The type of callback result.
//
// Returns:
//     The callback result.
Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default(CancellationToken));
注意WithTransactionAsync 是个Func而不是Action, 也没有Action的重载,必须在callback中返回一个值(这里是string.Empty)。

示例用法:

using (var session = await mongoClient.StartSessionAsync())
{
	await session.WithTransactionAsync(
		async (s, ct) =>
		{
			// execute operations using the session
			return string.Empty;
		});
}

与旧版API对比:

using (var session = client.StartSession())
{
    session.StartTransaction();
    // execute operations using the session
    session.CommitTransaction();
}

现在可以写一个RetryReplaceAsync (相对于BareReplaceAsync):

private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
	var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

	using (var session = await mongoClient.StartSessionAsync())
	{
		var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

		await session.WithTransactionAsync(
			async (s, ct) =>
			{
				await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
				return string.Empty;
			});
	}

	Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}

WriteConflictAsync try block中,用RetryReplaceAsync替换BareReplaceAsync:

public async Task WriteConflictAsync()
{
	var mongoClient = GetMongoClient();

	var uuid = "conflict1";
	await BareReplaceAsync(mongoClient, uuid, "init");

	Console.WriteLine("------ Retry Replace ------");
	var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
	await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
	try
	{
		await RetryReplaceAsync(mongoClient, uuid, "fast");
	}
	catch (Exception e)
	{
		Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
	}

	await slowReplaceTask;
	Console.WriteLine();

	Console.WriteLine($"[{DateTime.Now}] Completed...");
}

执行结果:

[2021/2/19 17:55:00] Done [init] ------ Retry Replace ------ [2021/2/19 17:55:02] Done [slow] [2021/2/19 17:55:03] Done [fast]

[2021/2/19 17:55:03] Completed...

没有异常抛出,第二个task在两秒后执行成功,可见重试逻辑已经生效。经过测试发现即使第一个task拖了很长时间(20秒),上面代码总是可以执行成功。但在实际应用中应该通过传入cancellation token实现超时机制,此处不再赘述。

实际上,即使不加超时机制,写冲突也很少发生。在生产环境中,写冲突的错误率在 0.00001% 以下。加上重试机制之后,这错误基本没再出现过。

一些其他事务实现的建议,可参考:# MongoDB BulkWrite无限重试问题解决

完整代码

using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;

namespace MongoTest
{
    public class RetryableTransaction
    {
        private const string DatabaseName = "Test";
        private const string CollectionName = "Test";
        public const string ConnectionString = "";

        public async Task WriteConflictAsync()
        {
            var mongoClient = new MongoClient(ConnectionString);

            var uuid = "conflict1";
            await BareReplaceAsync(mongoClient, uuid, "init");

            Console.WriteLine("------ Bare Replace ------");
            var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
            try
            {
                await BareReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine("------ Retry Replace ------");
            slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
            try
            {
                await RetryReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine($"[{DateTime.Now}] Completed...");
        }

        private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                session.StartTransaction();

                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
                await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
                await Task.Delay(delay); // Intentionally slow the transaction

                await session.CommitTransactionAsync();
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }

        private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

                await session.WithTransactionAsync(
                    async (s, ct) =>
                    {
                        await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
                        return string.Empty;
                    });
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }
    }
}