用MongoDB解决双花问题: 基于事务还是基于版本

双花问题是交易系统中的一个关键挑战,尤其是在管理账户余额或资金时。当系统允许由于并发操作或竞争条件导致同一笔资金被多次使用时,就会发生双花问题。本文将探讨使用 MongoDB 解决这一问题的两种方法:基于事务的处理基于版本的处理

本文是用MongoDB构建交易系统关于双花问题的深入讨论。

问题概述

考虑一个用户可以发起交易(如购买)的系统。如果两个请求试图同时使用相同的资金,系统可能错误地处理两个请求,从而导致状态无效。例如:

  • 用户 A 的账户余额为 $100。
  • 两个并发的操作各试图扣除 $60。
  • 如果没有保护机制,系统可能同时处理两次扣款,导致余额变为 $-20。

MongoDB 提供了一些工具来防止此类问题,确保在并发环境中的一致性和正确性。

方法一:使用事务

MongoDB 的多文档事务支持原子性,确保一系列操作要么全部完成,要么完全回滚。这对于跨多个操作的资金管理特别有用。

之前的文章 使用了这种方法来防止双花问题。

AccountEntry 定义如下:

public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

实现

下面是一个使用事务防止双花问题的DecreaseBalance 实现:

public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    using (var session = await _mongoClient.StartSessionAsync())
    using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
    {
        return await session.WithTransactionAsync(
            async (s, ct) =>
            {
                var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                if (accountEntry.Balance < amount)
                {
                    return new TransResult { Code = TransCode.InsufficientBalance };
                }

                var filter = Builders<AccountEntry>.Filter.And(
                    Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                    Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                var update = Builders<AccountEntry>.Update
                    .Inc(x => x.Balance, -amount)
                    .Set(x => x.UpdateTime, DateTime.UtcNow);
                var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                return new TransResult { NewBalance = result.Balance };
            }, cancellationToken: cts.Token);
    }
}

工作原理

  • 余额检查:在事务中检索账户的当前余额,确保余额足够进行扣减。
  • 条件更新:通过过滤器确保账户存在且余额大于等于请求金额。Inc 操作符以原子方式扣减余额,同时更新 UpdateTime 字段以用于审计。
  • 并发控制:事务确保余额更新安全,不会受到竞争条件和双花问题的影响。

方法二:使用版本控制

版本控制采用一种乐观锁控制机制,每个文档包含一个版本字段,在每次更新时递增。并发更新时会检查版本,以确保没有其他进程修改过文档,从而避免冲突。

AccountEntry 中添加版本字段:

public class AccountEntry
{
    public string Uuid { get; set; }
    public decimal Balance { get; set; }
    public int Version { get; set; }
    public DateTime CreateTime { get; set; }
    public DateTime UpdateTime { get; set; }
}

实现

以下是一个使用乐观锁机制的 DecreaseBalance API 实现:

public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
{
    if (amount <= 0)
    {
        throw new ArgumentException("Amount should > 0");
    }

    var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
    if (accountEntry.Balance < amount)
    {
        return new TransResult { Code = TransCode.InsufficientBalance };
    }

    var filter = Builders<AccountEntry>.Filter.And(
        Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
        Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
    );

    var update = Builders<AccountEntry>.Update
        .Inc(x => x.Balance, -amount)
        .Inc(x => x.Version, 1)    // Increment version
        .Set(x => x.UpdateTime, DateTime.UtcNow);

    var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
    var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
    if (result == null)
    {
        return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
    }

    return new TransResult { NewBalance = result.Balance };
}

工作原理

  • 检索账户:通过 Uuid 检索 AccountEntry,获取当前余额和版本,以确保数据是最新的。
  • 验证余额:检查当前余额是否足够覆盖扣减金额,然后继续操作。
  • 构建乐观锁定过滤器:将账户 ID 和版本结合到过滤器中,确保账户自检索后未被其他操作修改。
  • 原子更新:使用 FindOneAndUpdate 和原子操作扣减余额、递增版本,并更新时间戳。
  • 处理冲突:如果由于版本不匹配导致更新失败,则表明存在并发修改,API 返回冲突响应。
  • 返回更新后的余额:如果操作成功,将更新后的余额返回给调用方,反映操作结果。

测试代码

并发5个线程,在同一个账户中消费:

public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
{
    var tasks = new List<Task>();

    string accountId = Guid.NewGuid().ToString();
    var account = await CreateAccountAsync(accountId, 100m);

    for (int i = 0; i < 5; i++)
    {
        var cur = i;
        tasks.Add(Task.Run(async () =>
        {
            var r = await func(accountId, 60);
            Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
        }));
    }

    await Task.WhenAll(tasks);

    var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

    Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
}

static void Main(string[] args)
{
    var model = new DoubleSpendDemo();

    Console.WriteLine("Transaction Approach:");
    model.RunAsync(model.DecreaseBalanceAsync).Wait();

    Console.WriteLine("----------------------------------------------");
    Console.WriteLine("Versioning Approach:");
    model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

    Console.WriteLine("Done");
}

执行结果:

Transaction Approach: Task 4: code: Success Task 3: code: InsufficientBalance Task 0: code: InsufficientBalance Task 1: code: InsufficientBalance Task 2: code: InsufficientBalance Account 41806960-abf8-4765-9834-36af015c2fa9 Final Balance: 40

Versioning Approach: Task 4: code: ConcurrentUpdateFailure Task 1: code: ConcurrentUpdateFailure Task 3: code: ConcurrentUpdateFailure Task 2: code: Success Task 0: code: ConcurrentUpdateFailure Account 72820bfc-d9b8-4a66-8c7c-3eb5c2efe340 Final Balance: 40 Done

可见,二者解决问题的方式不同:使用事务的方案由于加锁,同时只能有一个Task介入执行,所以其他Task返回值是InsufficientBalance;而使用版本的方案是所有Task都可以介入,但在最终更新时仅有一个Task成功,所以其他Task返回值是ConcurrentUpdateFailure.

结论

两种方法的对比:

特性 基于事务 基于版本
并发控制 在Session中锁定多个文档 使用乐观锁控制
开发效率 高,直接使用事务 实现相对复杂,容易出错
性能 事务加锁,导致性能略差 在高并发场景下性能更快

事务和版本控制都是解决双花问题的有效工具:

  • 基于事务的方案保证强一致性,开发效率高,但会带来性能开销。
  • 基于版本的方案较为轻量,适用于高并发和简单数据依赖的场景。如果需要进一步提升性能,可以引入异步消息处理等方案,但同时也提升了系统复杂度。

根据需求选择合适的方法,开发者可以确保交易系统的一致性、可靠性和可扩展性。

完整代码

填充ConnectionString

using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using System;

namespace MongoTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var model = new DoubleSpendDemo();

            Console.WriteLine("Transaction Approach:");
            model.RunAsync(model.DecreaseBalanceAsync).Wait();

            Console.WriteLine("----------------------------------------------");
            Console.WriteLine("Versioning Approach:");
            model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();

            Console.WriteLine("Done");
        }
    }

    public class AccountEntry
    {
        public string Uuid { get; set; }
        public decimal Balance { get; set; }
        public int Version { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime UpdateTime { get; set; }
    }

    public class TransResult
    {
        public decimal NewBalance { get; set; } = 0;
        public TransCode Code { get; set; }
    }

    public enum TransCode
    {
        Success = 0,
        ConcurrentUpdateFailure = 1,
        InsufficientBalance = 2
    }

    public class DoubleSpendDemo
    {
        private readonly IMongoClient _mongoClient;
        private readonly IMongoCollection<AccountEntry> _accountCollection;

        private const string DatabaseName = "Transaction";
        private const string AccountCollectionName = "Account";
        public const string ConnectionString = "";

        public DoubleSpendDemo()
        {
            _mongoClient = new MongoClient(ConnectionString);
            var database = _mongoClient.GetDatabase(DatabaseName);

            var ignoreExtraElementsConvention = new ConventionPack { new IgnoreExtraElementsConvention(true) };
            ConventionRegistry.Register("IgnoreExtraElements", ignoreExtraElementsConvention, type => true);
            _accountCollection = database.GetCollection<AccountEntry>(AccountCollectionName);
        }

        public static string GenerateAccountUuid() => $"Acc_{Guid.NewGuid().ToString("N").ToUpper()}";

        public async Task<AccountEntry> CreateAccountAsync(string uuid, decimal initBalance)
        {
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                var newAccount = new AccountEntry
                {
                    Uuid = uuid,
                    Balance = initBalance,
                    CreateTime = DateTime.UtcNow,
                    UpdateTime = DateTime.UtcNow
                };

                await _accountCollection.InsertOneAsync(newAccount, cancellationToken: cts.Token);

                return newAccount;
            }
        }

        public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            using (var session = await _mongoClient.StartSessionAsync())
            using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
            {
                return await session.WithTransactionAsync(
                    async (s, ct) =>
                    {
                        var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
                        if (accountEntry.Balance < amount)
                        {
                            return new TransResult { Code = TransCode.InsufficientBalance };
                        }

                        var filter = Builders<AccountEntry>.Filter.And(
                            Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                            Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
                        var update = Builders<AccountEntry>.Update
                            .Inc(x => x.Balance, -amount)
                            .Set(x => x.UpdateTime, DateTime.UtcNow);
                        var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
                        var result =  await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
                        return new TransResult { NewBalance = result.Balance };
                    }, cancellationToken: cts.Token);
            }
        }

        public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
        {
            if (amount <= 0)
            {
                throw new ArgumentException("Amount should > 0");
            }

            var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
            if (accountEntry.Balance < amount)
            {
                return new TransResult { Code = TransCode.InsufficientBalance };
            }

            var filter = Builders<AccountEntry>.Filter.And(
                Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
                Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
            );

            var update = Builders<AccountEntry>.Update
                .Inc(x => x.Balance, -amount)
                .Inc(x => x.Version, 1)    // Increment version
                .Set(x => x.UpdateTime, DateTime.UtcNow);

            var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
            var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
            if (result == null)
            {
                return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
            }

            return new TransResult { NewBalance = result.Balance };
        }

        public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
        {
            var tasks = new List<Task>();

            string accountId = Guid.NewGuid().ToString();
            var account = await CreateAccountAsync(accountId, 100m);

            for (int i = 0; i < 5; i++)
            {
                var cur = i;
                tasks.Add(Task.Run(async () =>
                {
                    var r = await func(accountId, 60);
                    Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
                }));
            }

            await Task.WhenAll(tasks);

            var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();

            Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
        }
    }
}