Solving Double-Spending with MongoDB: Transactions vs. Versioning

The double-spending problem is a critical challenge in transaction systems, especially when managing account balances or funds. It occurs when a system allows the same funds to be spent multiple times due to concurrent operations or race conditions. In this article, we explore two approaches to resolving this issue using MongoDB: transaction-based handling and versioning-based handling.

This post is an in-depth discussion of the double-spending problem from the Building a Transaction System with MongoDB blog.

Problem Overview

Consider a system where users can initiate transactions like purchases. If two requests attempt to spend the same funds concurrently, the system may incorrectly process both, leading to an invalid state. For example:

  • User A has $100 in their account.
  • Two concurrent processes try to deduct $60 each.
  • Without safeguards, the system might process both, resulting in a balance of $-20.

MongoDB provides tools to prevent such issues, ensuring consistency and correctness in concurrent environments.

Approach 1: Using Transactions

MongoDB’s multi-document transactions enable atomicity, ensuring that a sequence of operations either completes entirely or not at all. This is particularly useful for managing funds across multiple operations.

The previous post uses this approach to prevent the double-spending issue.

The AccountEntry definition:

1
2
3
4
5
6
7
public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

Implementation

Below is a step-by-step implementation of a DecreaseBalance API using transactions to prevent double-spending.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    using (var session = await _mongoClient.StartSessionAsync())
    using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
    {
        return await session.WithTransactionAsync(
            async (s, ct) =>
            {
                var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                if (accountEntry.Balance < amount)
                {
                    return new TransResult { Code = TransCode.InsufficientBalance };
                }

                var filter = Builders<AccountEntry>.Filter.And(
                    Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                    Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                var update = Builders<AccountEntry>.Update
                    .Inc(x => x.Balance, -amount)
                    .Set(x => x.UpdateTime, DateTime.UtcNow);
                var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                return new TransResult { NewBalance = result.Balance };
            }, cancellationToken: cts.Token);
    }
}

How It Works

  • Balance Check: The account’s current balance is retrieved within the transaction to ensure it is sufficient for the deduction.
  • Conditional Update: A filter ensures the account exists and has a balance greater than or equal to the requested amount. The Inc operator atomically deducts the balance, while the UpdateTime field is updated for auditing purposes.
  • Concurrency Control: The transaction ensures that balance updates are safe from race conditions and double-spending.

Approach 2: Using Versioning

Versioning uses an optimistic concurrency control mechanism. Every document includes a version field, which is incremented with each update. Concurrent updates check the version to ensure no other process has modified the document, thus avoiding conflicts.

Add a version field to AccountEntry:

1
2
3
4
5
6
7
8
public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public int Version { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

Implementation

The following is an implementation of a DecreaseBalance API using an optimistic locking mechanism with a Version field:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
    if (accountEntry.Balance < amount)
    {
        return new TransResult { Code = TransCode.InsufficientBalance };
    }

    var filter = Builders<AccountEntry>.Filter.And(
        Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
        Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
    );

    var update = Builders<AccountEntry>.Update
        .Inc(x => x.Balance, -amount)
        .Inc(x => x.Version, 1)    // Increment version
        .Set(x => x.UpdateTime, DateTime.UtcNow);

    var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
    var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
    if (result == null)
    {
        return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
    }

    return new TransResult { NewBalance = result.Balance };
}

How It Works

  • Retrieve Account: The API retrieves the AccountEntry by Uuid to get the current balance and version, ensuring it has the latest data for validation.
  • Validate Balance: Checks if the current balance is sufficient to cover the deduction amount before proceeding.
  • Build Optimistic Locking Filter: Combines the account ID and version in a filter to ensure that the account has not been updated by another operation since retrieval.
  • Update Atomically: Uses MongoDB’s FindOneAndUpdate with atomic operations to deduct the balance, increment the version, and update the timestamp in a single step.
  • Handle Conflicts: If the update fails because of a version mismatch, it indicates a concurrent modification, and the API returns a conflict response.
  • Return Updated Balance: If successful, the updated balance is returned to the caller, reflecting the result of the operation.

Test Code

Concurrently run 5 threads to make transactions on the same account.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
{
    var tasks = new List<Task>();

    string accountId = Guid.NewGuid().ToString();
    var account = await CreateAccountAsync(accountId, 100m);

    for (int i = 0; i < 5; i++)
    {
        var cur = i;
        tasks.Add(Task.Run(async () =>
        {
            var r = await func(accountId, 60);
            Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
        }));
    }

    await Task.WhenAll(tasks);

    var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

    Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
}

static void Main(string[] args)
{
    var model = new DoubleSpendDemo();

    Console.WriteLine("Transaction Approach:");
    model.RunAsync(model.DecreaseBalanceAsync).Wait();

    Console.WriteLine("----------------------------------------------");
    Console.WriteLine("Versioning Approach:");
    model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

    Console.WriteLine("Done");
}

Results:

Transaction Approach: Task 4: code: Success Task 3: code: InsufficientBalance Task 0: code: InsufficientBalance Task 1: code: InsufficientBalance Task 2: code: InsufficientBalance Account 41806960-abf8-4765-9834-36af015c2fa9 Final Balance: 40

Versioning Approach: Task 4: code: ConcurrentUpdateFailure Task 1: code: ConcurrentUpdateFailure Task 3: code: ConcurrentUpdateFailure Task 2: code: Success Task 0: code: ConcurrentUpdateFailure Account 72820bfc-d9b8-4a66-8c7c-3eb5c2efe340 Final Balance: 40 Done

It can be seen that the two approaches solve the problem in different ways: the transaction-based approach uses locking, so only one Task can intervene and execute at a time, while the other Tasks return InsufficientBalance; on the other hand, with the versioning-based approach, all Tasks can intervene, but only one Task will succeed in the final update, so the other Tasks return ConcurrentUpdateFailure.

Conclusion

Comparison of the Two Approaches:

FeatureTransactionsVersioning
Concurrency ControlLocks multiple documents in a sessionRelies on optimistic concurrency
Development EfficiencyHigh, directly using transactionsRelatively complex to implement, prone to errors
PerformanceSlightly slower due to lockingFaster in high-concurrency scenarios

Both transactions and versioning are effective tools for addressing the double-spending problem in MongoDB.

  • Transactions offer strong guarantees for complex operations but come with a performance cost.
  • Versioning provides a lightweight solution suitable for scenarios with high concurrency and simpler data dependencies. To further improve the performance, solutions such as asynchronous message processing can be introduced, though this also increases system complexity.

By choosing the right approach based on system requirements, developers can ensure consistency, reliability, and scalability in their transaction systems.

Full Demo

Fill the ConnectionString:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using System;

namespace MongoTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var model = new DoubleSpendDemo();

            Console.WriteLine("Transaction Approach:");
            model.RunAsync(model.DecreaseBalanceAsync).Wait();

            Console.WriteLine("----------------------------------------------");
            Console.WriteLine("Versioning Approach:");
            model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

            Console.WriteLine("Done");
        }
    }

    public class AccountEntry
    {
        public string Uuid { get; set; }
        public decimal Balance { get; set; }
        public int Version { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime UpdateTime { get; set; }
    }

    public class TransResult
    {
        public decimal NewBalance { get; set; } = 0;
        public TransCode Code { get; set; }
    }

    public enum TransCode
    {
        Success = 0,
        ConcurrentUpdateFailure = 1,
        InsufficientBalance = 2
    }

    public class DoubleSpendDemo
    {
        private readonly IMongoClient _mongoClient;
        private readonly IMongoCollection<AccountEntry> _accountCollection;

        private const string DatabaseName = "Transaction";
        private const string AccountCollectionName = "Account";
        public const string ConnectionString = "";

        public DoubleSpendDemo()
        {
            _mongoClient = new MongoClient(ConnectionString);
            var database = _mongoClient.GetDatabase(DatabaseName);

            var ignoreExtraElementsConvention = new ConventionPack { new IgnoreExtraElementsConvention(true) };
            ConventionRegistry.Register("IgnoreExtraElements", ignoreExtraElementsConvention, type => true);
            _accountCollection = database.GetCollection<AccountEntry>(AccountCollectionName);
        }

        public static string GenerateAccountUuid() => $"Acc_{Guid.NewGuid().ToString("N").ToUpper()}";

        public async Task<AccountEntry> CreateAccountAsync(string uuid, decimal initBalance)
        {
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                var newAccount = new AccountEntry
                {
                    Uuid = uuid,
                    Balance = initBalance,
                    CreateTime = DateTime.UtcNow,
                    UpdateTime = DateTime.UtcNow
                };

                await _accountCollection.InsertOneAsync(newAccount, cancellationToken: cts.Token);

                return newAccount;
            }
        }

        public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            using (var session = await _mongoClient.StartSessionAsync())
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                return await session.WithTransactionAsync(
                    async (s, ct) =>
                    {
                        var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                        if (accountEntry.Balance < amount)
                        {
                            return new TransResult { Code = TransCode.InsufficientBalance };
                        }

                        var filter = Builders<AccountEntry>.Filter.And(
                            Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                            Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                        var update = Builders<AccountEntry>.Update
                            .Inc(x => x.Balance, -amount)
                            .Set(x => x.UpdateTime, DateTime.UtcNow);
                        var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                        var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                        return new TransResult { NewBalance = result.Balance };
                    }, cancellationToken: cts.Token);
            }
        }

        public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
            if (accountEntry.Balance < amount)
            {
                return new TransResult { Code = TransCode.InsufficientBalance };
            }

            var filter = Builders<AccountEntry>.Filter.And(
                Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
            );

            var update = Builders<AccountEntry>.Update
                .Inc(x => x.Balance, -amount)
                .Inc(x => x.Version, 1)    // Increment version
                .Set(x => x.UpdateTime, DateTime.UtcNow);

            var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
            var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
            if (result == null)
            {
                return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
            }

            return new TransResult { NewBalance = result.Balance };
        }

        public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
        {
            var tasks = new List<Task>();

            string accountId = Guid.NewGuid().ToString();
            var account = await CreateAccountAsync(accountId, 100m);

            for (int i = 0; i < 5; i++)
            {
                var cur = i;
                tasks.Add(Task.Run(async () =>
                {
                    var r = await func(accountId, 60);
                    Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
                }));
            }

            await Task.WhenAll(tasks);

            var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

            Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
        }
    }
}