How to Retry MongoDB Transaction

MongoDB transaction is a nice feature. Although MongoDB uses optimistic concurrency control, write conflict is unavoidable. The situation becomes worse in multi-document transaction which modifies many documents in one transaction. If a write conflict happens, a MongoDBCommandException will be thrown:

Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction..

How to handle the writeconflict error in MongoDB?

A straightforwad way is to manually retry the transaction in application level when exception happens. However, the retry logic is non-trivial: What's the exception type? Is it a transient or persistent error? How many times should we retry? How to apply the retry logic while keep the code clean?

Fortunately, the new MongoDB driver has already solved the issue. It incorperates the retry logic in the driver: https://docs.mongodb.com/manual/core/transactions/#transactions-api

The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

What we need is to use the new transaction API instead of the old one. However, even the official document samples still use the old API. In this article, we will compare the difference between them and provide easy-to-understand example usage.

Reproduce WriteConflict

Old Transactions API

Unfortunately, the official C# MongoDB driver documentation still use the old API:

using (var session = client.StartSession())
{
    session.StartTransaction();
    // execute operations using the session
    session.CommitTransaction(); // if an exception is thrown before reaching here the transaction will be implicitly aborted
}

WriteConflict Example

Let's reproduce the writeconflict error by the old API. We created two threads to update the same document, while intentionally make the first update slow by waiting 2s before commit:

using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;

namespace MongoTest
{
    public class RetryableTransaction
    {
        private const string DatabaseName = "Test";
        private const string CollectionName = "Test";
        public const string ConnectionString = "";

        public async Task WriteConflictAsync()
        {
            var mongoClient = new MongoClient(ConnectionString);

            var uuid = "conflict1";
            await BareReplaceAsync(mongoClient, uuid, "init");

            Console.WriteLine("------ Bare Replace ------");
            var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
            try
            {
                await BareReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine($"[{DateTime.Now}] Completed...");
        }

        private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                session.StartTransaction();

                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
                await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
                await Task.Delay(delay); // Intentionally slow the transaction

                await session.CommitTransactionAsync();
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }
    }
}

WriteConflictAsync() running result:

[2021/2/19 13:35:51] Done [init] ------ Bare Replace ------ [2021/2/19 13:35:51] Exception: Command update failed: Encountered error from mongodb.svc.cluster.local:27017 during a transaction :: caused by :: WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction.. [2021/2/19 13:35:53] Done [slow]

The second replace task failed due to writeconflict error. If we look into the MongoDBCommandException, we will find a "TransientTransactionError" in the exception ErrorLabel field.

Solutions

Manually Retry

This article provides a manual retry solution for such transient error. However, if we use the latest MongoDB driver with server version >= 4.2, there is a more graceful way to handle such exception.

As aforementioned, the new MongoDB driver already builtin the retry logic: https://docs.mongodb.com/manual/core/transactions/#transactions-api

The new callback API also incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

WithTransactionAsync definition:

//
// Summary:
//     Executes a callback within a transaction, with retries if needed.
//
// Parameters:
//   callbackAsync:
//     The user defined callback.
//
//   transactionOptions:
//     The transaction options.
//
//   cancellationToken:
//     The cancellation token.
//
// Type parameters:
//   TResult:
//     The type of callback result.
//
// Returns:
//     The callback result.
Task<TResult> WithTransactionAsync<TResult>(Func<IClientSessionHandle, CancellationToken, Task<TResult>> callbackAsync, TransactionOptions transactionOptions = null, CancellationToken cancellationToken = default(CancellationToken));
Note that WithTransactionAsync is a Func instead of an Action, we must return a value in the callback function.

Here is the sample usage:

using (var session = await mongoClient.StartSessionAsync())
{
	await session.WithTransactionAsync(
		async (s, ct) =>
		{
			// execute operations using the session
			return string.Empty;
		});
}

Compare with the old API:

using (var session = client.StartSession())
{
    session.StartTransaction();
    // execute operations using the session
    session.CommitTransaction();
}

Now we write RetryReplaceAsync (compared to BareReplaceAsync):

private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
{
	var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

	using (var session = await mongoClient.StartSessionAsync())
	{
		var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

		await session.WithTransactionAsync(
			async (s, ct) =>
			{
				await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
				return string.Empty;
			});
	}

	Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
}

Replace BareReplaceAsync by RetryReplaceAsync in WriteConflictAsync try block:

public async Task WriteConflictAsync()
{
	var mongoClient = GetMongoClient();

	var uuid = "conflict1";
	await BareReplaceAsync(mongoClient, uuid, "init");

	Console.WriteLine("------ Retry Replace ------");
	var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
	await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
	try
	{
		await RetryReplaceAsync(mongoClient, uuid, "fast");
	}
	catch (Exception e)
	{
		Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
	}

	await slowReplaceTask;
	Console.WriteLine();

	Console.WriteLine($"[{DateTime.Now}] Completed...");
}

Running result:

[2021/2/19 17:55:00] Done [init] ------ Retry Replace ------ [2021/2/19 17:55:02] Done [slow] [2021/2/19 17:55:03] Done [fast]

[2021/2/19 17:55:03] Completed...

The retry logic works as no exception is thrown. Even for a very long transaction (about 20s), the above code works fine. But we need taking timeout into account by passing a proper cancellation token.

Actually, the writeconflict error is rarely happen even without retry. For our production environment, the error rate is less than 0.00001%. After adding the above retry logic, the writeconflict error never happens.

Other transaction API usage suggestions: # MongoDB Transaction BulkWrite Endless Retry

Full Code

using MongoDB.Bson;
using MongoDB.Driver;
using System;
using System.Threading.Tasks;

namespace MongoTest
{
    public class RetryableTransaction
    {
        private const string DatabaseName = "Test";
        private const string CollectionName = "Test";
        public const string ConnectionString = "";

        public async Task WriteConflictAsync()
        {
            var mongoClient = new MongoClient(ConnectionString);

            var uuid = "conflict1";
            await BareReplaceAsync(mongoClient, uuid, "init");

            Console.WriteLine("------ Bare Replace ------");
            var slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make sure the slow task enter into a transaction
            try
            {
                await BareReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine("------ Retry Replace ------");
            slowReplaceTask = BareReplaceAsync(mongoClient, uuid, "slow", 2000); // Execute at least 2s
            await Task.Delay(20); // Wait a moment to make the slow task enter into a transaction
            try
            {
                await RetryReplaceAsync(mongoClient, uuid, "fast");
            }
            catch (Exception e)
            {
                Console.WriteLine($"[{DateTime.Now}] Exception: " + e.Message);
            }

            await slowReplaceTask;
            Console.WriteLine();

            Console.WriteLine($"[{DateTime.Now}] Completed...");
        }

        private async Task BareReplaceAsync(IMongoClient mongoClient, string uuid, string value, int delay = 0)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                session.StartTransaction();

                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);
                await collection.ReplaceOneAsync(session, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true });
                await Task.Delay(delay); // Intentionally slow the transaction

                await session.CommitTransactionAsync();
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }

        private async Task RetryReplaceAsync(IMongoClient mongoClient, string uuid, string value)
        {
            var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);

            using (var session = await mongoClient.StartSessionAsync())
            {
                var filter = Builders<BsonDocument>.Filter.Eq("Uuid", uuid);

                await session.WithTransactionAsync(
                    async (s, ct) =>
                    {
                        await collection.ReplaceOneAsync(s, filter, new BsonDocument { { "Uuid", uuid }, { "op", value } }, new ReplaceOptions { IsUpsert = true }, ct);
                        return string.Empty;
                    });
            }

            Console.WriteLine($"[{DateTime.Now}] Done [{value}]");
        }
    }
}