How to Retry MongoDB Transaction
MongoDB transaction is a nice feature. Although MongoDB uses optimistic concurrency control, write conflict is unavoidable. The situation becomes worse in multi-document transaction which modifies many documents in one transaction. If a write conflict happens, a MongoDBCommandException will be thrown:
Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction..
How to handle the writeconflict error in MongoDB?
A straightforwad way is to manually retry the transaction in application level when exception happens. However, the retry logic is non-trivial: What's the exception type? Is it a transient or persistent error? How many times should we retry? How to apply the retry logic while keep the code clean?
Fortunately, the new MongoDB driver has already solved the issue. It incorperates the retry logic in the driver: https://docs.mongodb.com/manual/core/transactions/#transactions-api
The new callback API also incorporates retry logic for
TransientTransactionError
orUnknownTransactionCommitResult
commit errors.
What we need is to use the new transaction API instead of the old one. However, even the official document samples still use the old API. In this article, we will compare the difference between them and provide easy-to-understand example usage.
Reproduce WriteConflict
Old Transactions API
Unfortunately, the official C# MongoDB driver documentation still use the old API:
using (var session = client.StartSession())
{
session.StartTransaction();
// execute operations using the session
session.CommitTransaction(); // if an exception is thrown before reaching here the transaction will be implicitly aborted
}
WriteConflict Example
Let's reproduce the writeconflict error by the old API. We created two threads to update the same document, while intentionally make the first update slow by waiting 2s before commit:
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;
namespace MongoTest
{
public class RetryableTransaction
{
private const string DatabaseName = "Test";
private const string CollectionName = "Test";
public const string ConnectionString = "";
public async Task WriteConflictAsync()
{
var mongoClient = new MongoClient(ConnectionString);
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Bare Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
try
{
await BareReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
session.StartTransaction();
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
await Task.Delay(delay); // Intentionally slow the transaction
await session.CommitTransactionAsync();
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
}
}
WriteConflictAsync() running result:
[2021/2/19 13:35:51] Done [init] ------ Bare Replace ------ [2021/2/19 13:35:51] Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.. [2021/2/19 13:35:53] Done [slow]
The second replace task failed due to writeconflict error. If we look
into the MongoDBCommandException
, we will find a
"TransientTransactionError" in the exception ErrorLabel field.
Solutions
Manually Retry
This article provides a manual retry solution for such transient error. However, if we use the latest MongoDB driver with server version >= 4.2, there is a more graceful way to handle such exception.
Use New Transactions API (recommended)
As aforementioned, the new MongoDB driver already builtin the retry logic: https://docs.mongodb.com/manual/core/transactions/#transactions-api
The new callback API also incorporates retry logic for
TransientTransactionError
orUnknownTransactionCommitResult
commit errors.
WithTransactionAsync
definition:
//
// Summary:
// Executes a callback within a transaction, with retries if needed.
//
// Parameters:
// callbackAsync:
// The user defined callback.
//
// transactionOptions:
// The transaction options.
//
// cancellationToken:
// The cancellation token.
//
// Type parameters:
// TResult:
// The type of callback result.
//
// Returns:
// The callback result.
Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default(CancellationToken));
Note that
WithTransactionAsync
is a Func
instead of an
Action
, we must return a value in the callback
function.
Here is the sample usage:
using (var session = await mongoClient.StartSessionAsync())
{
await session.WithTransactionAsync(
async (s, ct) =>
{
// execute operations using the session
return string.Empty;
});
}
Compare with the old API:
using (var session = client.StartSession())
{
session.StartTransaction();
// execute operations using the session
session.CommitTransaction();
}
Now we write RetryReplaceAsync
(compared to
BareReplaceAsync
):
private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await session.WithTransactionAsync(
async (s, ct) =>
{
await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
return string.Empty;
});
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
Replace BareReplaceAsync
by
RetryReplaceAsync
in WriteConflictAsync
try
block:
public async Task WriteConflictAsync()
{
var mongoClient = GetMongoClient();
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Retry Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
try
{
await RetryReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
Running result:
[2021/2/19 17:55:00] Done [init] ------ Retry Replace ------ [2021/2/19 17:55:02] Done [slow] [2021/2/19 17:55:03] Done [fast]
[2021/2/19 17:55:03] Completed...
The retry logic works as no exception is thrown. Even for a very long transaction (about 20s), the above code works fine. But we need taking timeout into account by passing a proper cancellation token.
Actually, the writeconflict error is rarely happen even without retry. For our production environment, the error rate is less than 0.00001%. After adding the above retry logic, the writeconflict error never happens.
Other transaction API usage suggestions: # MongoDB Transaction BulkWrite Endless Retry
Full Code
using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;
namespace MongoTest
{
public class RetryableTransaction
{
private const string DatabaseName = "Test";
private const string CollectionName = "Test";
public const string ConnectionString = "";
public async Task WriteConflictAsync()
{
var mongoClient = new MongoClient(ConnectionString);
var uuid = "conflict1";
await BareReplaceAsync(mongoClient, uuid, "init");
Console.WriteLine("------ Bare Replace ------");
var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
try
{
await BareReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine("------ Retry Replace ------");
slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
try
{
await RetryReplaceAsync(mongoClient, uuid, "fast");
}
catch (Exception e)
{
Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
}
await slowReplaceTask;
Console.WriteLine();
Console.WriteLine($"[{DateTime.Now}] Completed...");
}
private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
session.StartTransaction();
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
await Task.Delay(delay); // Intentionally slow the transaction
await session.CommitTransactionAsync();
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
using (var session = await mongoClient.StartSessionAsync())
{
var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
await session.WithTransactionAsync(
async (s, ct) =>
{
await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
return string.Empty;
});
}
Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}
}
}