MongoDB事务重试实现
MongoDB事务是个很好的功能,但对于高并发场景下的多文档事务,写冲突难以避免。一个写冲突的实例:
Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction..
那么如何正确实现事务的重试?
最直接的想法就是在应用层进行手动重试,一旦发现有异常,就开始重试。但是重试的实现并没有那么简单,要考虑的问题很多:这个异常是什么类型的?它是瞬时异常还是持续异常? 要重试多少次? 如何实现重试机制的同时保证代码的简洁?
好消息是新的MongoDB driver已经帮我们解决了重试的问题。它内置了重试机制:https://docs.mongodb.com/manual/core/transactions/#transactions-api
The new callback API also incorporates retry logic for
TransientTransactionError
orUnknownTransactionCommitResult
commit errors.
我们需要做的就是使用新的transaction API来实现事务。但我不太理解的是即使是在官方C# MongoDB driver文档上依然使用的是旧的API,也就是不带重试的版本。所以本文比较了新老API的差异,并给出了使用实例。
重现WriteConflict错误
旧版Transactions API
来自官方文档 C# MongoDB driver documentation:
using (var session = client.StartSession())
{
session.StartTransaction();
// execute operations using the session
session.CommitTransaction(); // if an exception is thrown before reaching here the transaction will be implicitly aborted
}
WriteConflict示例
我们用旧版API实现一个写冲突的例子。创建两个task更新同一个文档,为了让它稳定复现,刻意让第一个task在commit之前等上两秒:
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;
namespace MongoTest
{
public class RetryableTransaction
{
private const string DatabaseName = "Test";
private const string CollectionName = "Test";
public const string ConnectionString = "";
public async Task WriteConflictAsync()
{
var mongoClient = new MongoClient(ConnectionString);
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Bare Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
try
{
await BareReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
session.StartTransaction();
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
await Task.Delay(delay); // Intentionally slow the transaction
await session.CommitTransactionAsync();
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
}
}
WriteConflictAsync() 执行结果:
[2021/2/19 13:35:51] Done [init] ------ Bare Replace ------ [2021/2/19 13:35:51] Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.. [2021/2/19 13:35:53] Done [slow]
显然,第二个更新task因为写冲突失败了。如果我们看看抛出的
MongoDBCommandException
,
会发现在这个异常的ErrorLabel里有个TransientTransactionError
。也就是说异常本身就告诉我们这是个瞬时错误,可以通过重试解决。
解决方案
应用层手动重试
这篇文章给了一个对TransientTransactionError
手动重试的实现方法。但这种方法并不推荐使用,更优雅的做法是用MongoDB
driver的重试机制。需要升级driver到最新版本,并且保证server的版本>=4.2(不太确定)?
新版Transactions API (recommended)
新的Driver天然集成了重试机制: https://docs.mongodb.com/manual/core/transactions/#transactions-api
The new callback API also incorporates retry logic for
TransientTransactionError
orUnknownTransactionCommitResult
commit errors.
WithTransactionAsync
定义:
//
// Summary:
// Executes a callback within a transaction, with retries if needed.
//
// Parameters:
// callbackAsync:
// The user defined callback.
//
// transactionOptions:
// The transaction options.
//
// cancellationToken:
// The cancellation token.
//
// Type parameters:
// TResult:
// The type of callback result.
//
// Returns:
// The callback result.
Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default(CancellationToken));
注意WithTransactionAsync
是个Func
而不是Action
,
也没有Action
的重载,必须在callback中返回一个值(这里是string.Empty)。
示例用法:
using (var session = await mongoClient.StartSessionAsync())
{
await session.WithTransactionAsync(
async (s, ct) =>
{
// execute operations using the session
return string.Empty;
});
}
与旧版API对比:
using (var session = client.StartSession())
{
session.StartTransaction();
// execute operations using the session
session.CommitTransaction();
}
现在可以写一个RetryReplaceAsync
(相对于BareReplaceAsync
):
private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await session.WithTransactionAsync(
async (s, ct) =>
{
await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
return string.Empty;
});
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
在WriteConflictAsync
try
block中,用RetryReplaceAsync
替换BareReplaceAsync
:
public async Task WriteConflictAsync()
{
var mongoClient = GetMongoClient();
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Retry Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
try
{
await RetryReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
执行结果:
[2021/2/19 17:55:00] Done [init] ------ Retry Replace ------ [2021/2/19 17:55:02] Done [slow] [2021/2/19 17:55:03] Done [fast]
[2021/2/19 17:55:03] Completed...
没有异常抛出,第二个task在两秒后执行成功,可见重试逻辑已经生效。经过测试发现即使第一个task拖了很长时间(20秒),上面代码总是可以执行成功。但在实际应用中应该通过传入cancellation token实现超时机制,此处不再赘述。
实际上,即使不加超时机制,写冲突也很少发生。在生产环境中,写冲突的错误率在 0.00001% 以下。加上重试机制之后,这错误基本没再出现过。
一些其他事务实现的建议,可参考:# MongoDB BulkWrite无限重试问题解决
完整代码
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;
namespace MongoTest
{
public class RetryableTransaction
{
private const string DatabaseName = "Test";
private const string CollectionName = "Test";
public const string ConnectionString = "";
public async Task WriteConflictAsync()
{
var mongoClient = new MongoClient(ConnectionString);
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Bare Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
try
{
await BareReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine("------ Retry Replace ------");
slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
try
{
await RetryReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
session.StartTransaction();
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
await Task.Delay(delay); // Intentionally slow the transaction
await session.CommitTransactionAsync();
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await session.WithTransactionAsync(
async (s, ct) =>
{
await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
return string.Empty;
});
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
}
}