用MongoDB解决双花问题: 基于事务还是基于版本

双花问题是交易系统中的一个关键挑战,尤其是在管理账户余额或资金时。当系统允许由于并发操作或竞争条件导致同一笔资金被多次使用时,就会发生双花问题。本文将探讨使用 MongoDB 解决这一问题的两种方法:基于事务的处理基于版本的处理

本文是 用MongoDB构建交易系统 关于双花问题的深入讨论。

问题概述

考虑一个用户可以发起交易(如购买)的系统。如果两个请求试图同时使用相同的资金,系统可能错误地处理两个请求,从而导致状态无效。例如:

  • 用户 A 的账户余额为 $100。
  • 两个并发的操作各试图扣除 $60。
  • 如果没有保护机制,系统可能同时处理两次扣款,导致余额变为 $-20。

MongoDB 提供了一些工具来防止此类问题,确保在并发环境中的一致性和正确性。

方法一:使用事务

MongoDB 的多文档事务支持原子性,确保一系列操作要么全部完成,要么完全回滚。这对于跨多个操作的资金管理特别有用。

之前的文章 使用了这种方法来防止双花问题。

AccountEntry 定义如下:

1
2
3
4
5
6
7
public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

实现

下面是一个使用事务防止双花问题的DecreaseBalance 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    using (var session = await _mongoClient.StartSessionAsync())
    using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
    {
        return await session.WithTransactionAsync(
            async (s, ct) =>
            {
                var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                if (accountEntry.Balance < amount)
                {
                    return new TransResult { Code = TransCode.InsufficientBalance };
                }

                var filter = Builders<AccountEntry>.Filter.And(
                    Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                    Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                var update = Builders<AccountEntry>.Update
                    .Inc(x => x.Balance, -amount)
                    .Set(x => x.UpdateTime, DateTime.UtcNow);
                var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                return new TransResult { NewBalance = result.Balance };
            }, cancellationToken: cts.Token);
    }
}

工作原理

  • 余额检查:在事务中检索账户的当前余额,确保余额足够进行扣减。
  • 条件更新:通过过滤器确保账户存在且余额大于等于请求金额。Inc 操作符以原子方式扣减余额,同时更新 UpdateTime 字段以用于审计。
  • 并发控制:事务确保余额更新安全,不会受到竞争条件和双花问题的影响。

方法二:使用版本控制

版本控制采用一种乐观锁控制机制,每个文档包含一个版本字段,在每次更新时递增。并发更新时会检查版本,以确保没有其他进程修改过文档,从而避免冲突。

AccountEntry 中添加版本字段:

1
2
3
4
5
6
7
8
public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public int Version { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

实现

以下是一个使用乐观锁机制的 DecreaseBalance API 实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
    if (accountEntry.Balance < amount)
    {
        return new TransResult { Code = TransCode.InsufficientBalance };
    }

    var filter = Builders<AccountEntry>.Filter.And(
        Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
        Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
    );

    var update = Builders<AccountEntry>.Update
        .Inc(x => x.Balance, -amount)
        .Inc(x => x.Version, 1)    // Increment version
        .Set(x => x.UpdateTime, DateTime.UtcNow);

    var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
    var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
    if (result == null)
    {
        return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
    }

    return new TransResult { NewBalance = result.Balance };
}

工作原理

  • 检索账户:通过 Uuid 检索 AccountEntry,获取当前余额和版本,以确保数据是最新的。
  • 验证余额:检查当前余额是否足够覆盖扣减金额,然后继续操作。
  • 构建乐观锁定过滤器:将账户 ID 和版本结合到过滤器中,确保账户自检索后未被其他操作修改。
  • 原子更新:使用 FindOneAndUpdate 和原子操作扣减余额、递增版本,并更新时间戳。
  • 处理冲突:如果由于版本不匹配导致更新失败,则表明存在并发修改,API 返回冲突响应。
  • 返回更新后的余额:如果操作成功,将更新后的余额返回给调用方,反映操作结果。

测试代码

并发5个线程,在同一个账户中消费:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
{
    var tasks = new List<Task>();

    string accountId = Guid.NewGuid().ToString();
    var account = await CreateAccountAsync(accountId, 100m);

    for (int i = 0; i < 5; i++)
    {
        var cur = i;
        tasks.Add(Task.Run(async () =>
        {
            var r = await func(accountId, 60);
            Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
        }));
    }

    await Task.WhenAll(tasks);

    var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

    Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
}

static void Main(string[] args)
{
    var model = new DoubleSpendDemo();

    Console.WriteLine("Transaction Approach:");
    model.RunAsync(model.DecreaseBalanceAsync).Wait();

    Console.WriteLine("----------------------------------------------");
    Console.WriteLine("Versioning Approach:");
    model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

    Console.WriteLine("Done");
}

执行结果:

Transaction Approach: Task 4: code: Success Task 3: code: InsufficientBalance Task 0: code: InsufficientBalance Task 1: code: InsufficientBalance Task 2: code: InsufficientBalance Account 41806960-abf8-4765-9834-36af015c2fa9 Final Balance: 40

Versioning Approach: Task 4: code: ConcurrentUpdateFailure Task 1: code: ConcurrentUpdateFailure Task 3: code: ConcurrentUpdateFailure Task 2: code: Success Task 0: code: ConcurrentUpdateFailure Account 72820bfc-d9b8-4a66-8c7c-3eb5c2efe340 Final Balance: 40 Done

可见,二者解决问题的方式不同:使用事务的方案由于加锁,同时只能有一个Task介入执行,所以其他Task返回值是InsufficientBalance;而使用版本的方案是所有Task都可以介入,但在最终更新时仅有一个Task成功,所以其他Task返回值是ConcurrentUpdateFailure.

结论

两种方法的对比:

特性基于事务基于版本
并发控制在Session中锁定多个文档使用乐观锁控制
开发效率高,直接使用事务实现相对复杂,容易出错
性能事务加锁,导致性能略差在高并发场景下性能更快

事务和版本控制都是解决双花问题的有效工具:

  • 基于事务的方案保证强一致性,开发效率高,但会带来性能开销。
  • 基于版本的方案较为轻量,适用于高并发和简单数据依赖的场景。如果需要进一步提升性能,可以引入异步消息处理等方案,但同时也提升了系统复杂度。

根据需求选择合适的方法,开发者可以确保交易系统的一致性、可靠性和可扩展性。

完整代码

填充ConnectionString

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using System;

namespace MongoTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var model = new DoubleSpendDemo();

            Console.WriteLine("Transaction Approach:");
            model.RunAsync(model.DecreaseBalanceAsync).Wait();

            Console.WriteLine("----------------------------------------------");
            Console.WriteLine("Versioning Approach:");
            model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

            Console.WriteLine("Done");
        }
    }

    public class AccountEntry
    {
        public string Uuid { get; set; }
        public decimal Balance { get; set; }
        public int Version { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime UpdateTime { get; set; }
    }

    public class TransResult
    {
        public decimal NewBalance { get; set; } = 0;
        public TransCode Code { get; set; }
    }

    public enum TransCode
    {
        Success = 0,
        ConcurrentUpdateFailure = 1,
        InsufficientBalance = 2
    }

    public class DoubleSpendDemo
    {
        private readonly IMongoClient _mongoClient;
        private readonly IMongoCollection<AccountEntry> _accountCollection;

        private const string DatabaseName = "Transaction";
        private const string AccountCollectionName = "Account";
        public const string ConnectionString = "";

        public DoubleSpendDemo()
        {
            _mongoClient = new MongoClient(ConnectionString);
            var database = _mongoClient.GetDatabase(DatabaseName);

            var ignoreExtraElementsConvention = new ConventionPack { new IgnoreExtraElementsConvention(true) };
            ConventionRegistry.Register("IgnoreExtraElements", ignoreExtraElementsConvention, type => true);
            _accountCollection = database.GetCollection<AccountEntry>(AccountCollectionName);
        }

        public static string GenerateAccountUuid() => $"Acc_{Guid.NewGuid().ToString("N").ToUpper()}";

        public async Task<AccountEntry> CreateAccountAsync(string uuid, decimal initBalance)
        {
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                var newAccount = new AccountEntry
                {
                    Uuid = uuid,
                    Balance = initBalance,
                    CreateTime = DateTime.UtcNow,
                    UpdateTime = DateTime.UtcNow
                };

                await _accountCollection.InsertOneAsync(newAccount, cancellationToken: cts.Token);

                return newAccount;
            }
        }

        public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            using (var session = await _mongoClient.StartSessionAsync())
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                return await session.WithTransactionAsync(
                    async (s, ct) =>
                    {
                        var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                        if (accountEntry.Balance < amount)
                        {
                            return new TransResult { Code = TransCode.InsufficientBalance };
                        }

                        var filter = Builders<AccountEntry>.Filter.And(
                            Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                            Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                        var update = Builders<AccountEntry>.Update
                            .Inc(x => x.Balance, -amount)
                            .Set(x => x.UpdateTime, DateTime.UtcNow);
                        var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                        var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                        return new TransResult { NewBalance = result.Balance };
                    }, cancellationToken: cts.Token);
            }
        }

        public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
            if (accountEntry.Balance < amount)
            {
                return new TransResult { Code = TransCode.InsufficientBalance };
            }

            var filter = Builders<AccountEntry>.Filter.And(
                Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
            );

            var update = Builders<AccountEntry>.Update
                .Inc(x => x.Balance, -amount)
                .Inc(x => x.Version, 1)    // Increment version
                .Set(x => x.UpdateTime, DateTime.UtcNow);

            var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
            var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
            if (result == null)
            {
                return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
            }

            return new TransResult { NewBalance = result.Balance };
        }

        public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
        {
            var tasks = new List<Task>();

            string accountId = Guid.NewGuid().ToString();
            var account = await CreateAccountAsync(accountId, 100m);

            for (int i = 0; i < 5; i++)
            {
                var cur = i;
                tasks.Add(Task.Run(async () =>
                {
                    var r = await func(accountId, 60);
                    Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
                }));
            }

            await Task.WhenAll(tasks);

            var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

            Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
        }
    }
}