用MongoDB解决双花问题: 基于事务还是基于版本
双花问题是交易系统中的一个关键挑战,尤其是在管理账户余额或资金时。当系统允许由于并发操作或竞争条件导致同一笔资金被多次使用时,就会发生双花问题。本文将探讨使用 MongoDB 解决这一问题的两种方法:基于事务的处理和基于版本的处理。
本文是用MongoDB构建交易系统关于双花问题的深入讨论。
问题概述
考虑一个用户可以发起交易(如购买)的系统。如果两个请求试图同时使用相同的资金,系统可能错误地处理两个请求,从而导致状态无效。例如:
- 用户 A 的账户余额为 $100。
- 两个并发的操作各试图扣除 $60。
- 如果没有保护机制,系统可能同时处理两次扣款,导致余额变为 $-20。
MongoDB 提供了一些工具来防止此类问题,确保在并发环境中的一致性和正确性。
方法一:使用事务
MongoDB 的多文档事务支持原子性,确保一系列操作要么全部完成,要么完全回滚。这对于跨多个操作的资金管理特别有用。
之前的文章 使用了这种方法来防止双花问题。
AccountEntry
定义如下:
public class AccountEntry
{
public string Uuid { get; set; }
public decimal Balance { get; set; }
public DateTime CreateTime { get; set; }
public DateTime UpdateTime { get; set; }
}
实现
下面是一个使用事务防止双花问题的DecreaseBalance
实现:
public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
{
if (amount <= 0)
{
throw new ArgumentException("Amount should > 0");
}
using (var session = await _mongoClient.StartSessionAsync())
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
return await session.WithTransactionAsync(
async (s, ct) =>
{
var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
if (accountEntry.Balance < amount)
{
return new TransResult { Code = TransCode.InsufficientBalance };
}
var filter = Builders<AccountEntry>.Filter.And(
Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
var update = Builders<AccountEntry>.Update
.Inc(x => x.Balance, -amount)
.Set(x => x.UpdateTime, DateTime.UtcNow);
var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
var result = await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
return new TransResult { NewBalance = result.Balance };
}, cancellationToken: cts.Token);
}
}
工作原理
- 余额检查:在事务中检索账户的当前余额,确保余额足够进行扣减。
- 条件更新:通过过滤器确保账户存在且余额大于等于请求金额。
Inc
操作符以原子方式扣减余额,同时更新UpdateTime
字段以用于审计。 - 并发控制:事务确保余额更新安全,不会受到竞争条件和双花问题的影响。
方法二:使用版本控制
版本控制采用一种乐观锁控制机制,每个文档包含一个版本字段,在每次更新时递增。并发更新时会检查版本,以确保没有其他进程修改过文档,从而避免冲突。
在 AccountEntry
中添加版本字段:
public class AccountEntry
{
public string Uuid { get; set; }
public decimal Balance { get; set; }
public int Version { get; set; }
public DateTime CreateTime { get; set; }
public DateTime UpdateTime { get; set; }
}
实现
以下是一个使用乐观锁机制的 DecreaseBalance
API
实现:
public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
{
if (amount <= 0)
{
throw new ArgumentException("Amount should > 0");
}
var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
if (accountEntry.Balance < amount)
{
return new TransResult { Code = TransCode.InsufficientBalance };
}
var filter = Builders<AccountEntry>.Filter.And(
Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
);
var update = Builders<AccountEntry>.Update
.Inc(x => x.Balance, -amount)
.Inc(x => x.Version, 1) // Increment version
.Set(x => x.UpdateTime, DateTime.UtcNow);
var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
if (result == null)
{
return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
}
return new TransResult { NewBalance = result.Balance };
}
工作原理
- 检索账户:通过
Uuid
检索AccountEntry
,获取当前余额和版本,以确保数据是最新的。 - 验证余额:检查当前余额是否足够覆盖扣减金额,然后继续操作。
- 构建乐观锁定过滤器:将账户 ID 和版本结合到过滤器中,确保账户自检索后未被其他操作修改。
- 原子更新:使用
FindOneAndUpdate
和原子操作扣减余额、递增版本,并更新时间戳。 - 处理冲突:如果由于版本不匹配导致更新失败,则表明存在并发修改,API 返回冲突响应。
- 返回更新后的余额:如果操作成功,将更新后的余额返回给调用方,反映操作结果。
测试代码
并发5个线程,在同一个账户中消费:
public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
{
var tasks = new List<Task>();
string accountId = Guid.NewGuid().ToString();
var account = await CreateAccountAsync(accountId, 100m);
for (int i = 0; i < 5; i++)
{
var cur = i;
tasks.Add(Task.Run(async () =>
{
var r = await func(accountId, 60);
Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
}));
}
await Task.WhenAll(tasks);
var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
}
static void Main(string[] args)
{
var model = new DoubleSpendDemo();
Console.WriteLine("Transaction Approach:");
model.RunAsync(model.DecreaseBalanceAsync).Wait();
Console.WriteLine("----------------------------------------------");
Console.WriteLine("Versioning Approach:");
model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();
Console.WriteLine("Done");
}
执行结果:
Transaction Approach: Task 4: code: Success Task 3: code: InsufficientBalance Task 0: code: InsufficientBalance Task 1: code: InsufficientBalance Task 2: code: InsufficientBalance Account 41806960-abf8-4765-9834-36af015c2fa9 Final Balance: 40
Versioning Approach: Task 4: code: ConcurrentUpdateFailure Task 1: code: ConcurrentUpdateFailure Task 3: code: ConcurrentUpdateFailure Task 2: code: Success Task 0: code: ConcurrentUpdateFailure Account 72820bfc-d9b8-4a66-8c7c-3eb5c2efe340 Final Balance: 40 Done
可见,二者解决问题的方式不同:使用事务的方案由于加锁,同时只能有一个Task介入执行,所以其他Task返回值是InsufficientBalance
;而使用版本的方案是所有Task都可以介入,但在最终更新时仅有一个Task成功,所以其他Task返回值是ConcurrentUpdateFailure
.
结论
两种方法的对比:
特性 | 基于事务 | 基于版本 |
---|---|---|
并发控制 | 在Session中锁定多个文档 | 使用乐观锁控制 |
开发效率 | 高,直接使用事务 | 实现相对复杂,容易出错 |
性能 | 事务加锁,导致性能略差 | 在高并发场景下性能更快 |
事务和版本控制都是解决双花问题的有效工具:
- 基于事务的方案保证强一致性,开发效率高,但会带来性能开销。
- 基于版本的方案较为轻量,适用于高并发和简单数据依赖的场景。如果需要进一步提升性能,可以引入异步消息处理等方案,但同时也提升了系统复杂度。
根据需求选择合适的方法,开发者可以确保交易系统的一致性、可靠性和可扩展性。
完整代码
填充ConnectionString
:
using MongoDB.Bson.Serialization.Conventions;
using MongoDB.Driver;
using System;
namespace MongoTest
{
class Program
{
static void Main(string[] args)
{
var model = new DoubleSpendDemo();
Console.WriteLine("Transaction Approach:");
model.RunAsync(model.DecreaseBalanceAsync).Wait();
Console.WriteLine("----------------------------------------------");
Console.WriteLine("Versioning Approach:");
model.RunAsync(model.DecreaseBalanceWithVersionAsync).Wait();
Console.WriteLine("Done");
}
}
public class AccountEntry
{
public string Uuid { get; set; }
public decimal Balance { get; set; }
public int Version { get; set; }
public DateTime CreateTime { get; set; }
public DateTime UpdateTime { get; set; }
}
public class TransResult
{
public decimal NewBalance { get; set; } = 0;
public TransCode Code { get; set; }
}
public enum TransCode
{
Success = 0,
ConcurrentUpdateFailure = 1,
InsufficientBalance = 2
}
public class DoubleSpendDemo
{
private readonly IMongoClient _mongoClient;
private readonly IMongoCollection<AccountEntry> _accountCollection;
private const string DatabaseName = "Transaction";
private const string AccountCollectionName = "Account";
public const string ConnectionString = "";
public DoubleSpendDemo()
{
_mongoClient = new MongoClient(ConnectionString);
var database = _mongoClient.GetDatabase(DatabaseName);
var ignoreExtraElementsConvention = new ConventionPack { new IgnoreExtraElementsConvention(true) };
ConventionRegistry.Register("IgnoreExtraElements", ignoreExtraElementsConvention, type => true);
_accountCollection = database.GetCollection<AccountEntry>(AccountCollectionName);
}
public static string GenerateAccountUuid() => $"Acc_{Guid.NewGuid().ToString("N").ToUpper()}";
public async Task<AccountEntry> CreateAccountAsync(string uuid, decimal initBalance)
{
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
var newAccount = new AccountEntry
{
Uuid = uuid,
Balance = initBalance,
CreateTime = DateTime.UtcNow,
UpdateTime = DateTime.UtcNow
};
await _accountCollection.InsertOneAsync(newAccount, cancellationToken: cts.Token);
return newAccount;
}
}
public async Task<TransResult> DecreaseBalanceAsync(string accountId, decimal amount)
{
if (amount <= 0)
{
throw new ArgumentException("Amount should > 0");
}
using (var session = await _mongoClient.StartSessionAsync())
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
return await session.WithTransactionAsync(
async (s, ct) =>
{
var accountEntry = await _accountCollection.Find(s, x => x.Uuid == accountId).FirstOrDefaultAsync(ct);
if (accountEntry.Balance < amount)
{
return new TransResult { Code = TransCode.InsufficientBalance };
}
var filter = Builders<AccountEntry>.Filter.And(
Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
Builders<AccountEntry>.Filter.Gte(x => x.Balance, amount));
var update = Builders<AccountEntry>.Update
.Inc(x => x.Balance, -amount)
.Set(x => x.UpdateTime, DateTime.UtcNow);
var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
var result = await _accountCollection.FindOneAndUpdateAsync(s, filter, update, options, cancellationToken: ct);
return new TransResult { NewBalance = result.Balance };
}, cancellationToken: cts.Token);
}
}
public async Task<TransResult> DecreaseBalanceWithVersionAsync(string accountId, decimal amount)
{
if (amount <= 0)
{
throw new ArgumentException("Amount should > 0");
}
var accountEntry = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
if (accountEntry.Balance < amount)
{
return new TransResult { Code = TransCode.InsufficientBalance };
}
var filter = Builders<AccountEntry>.Filter.And(
Builders<AccountEntry>.Filter.Eq(x => x.Uuid, accountId),
Builders<AccountEntry>.Filter.Eq(x => x.Version, accountEntry.Version) // Match the current version
);
var update = Builders<AccountEntry>.Update
.Inc(x => x.Balance, -amount)
.Inc(x => x.Version, 1) // Increment version
.Set(x => x.UpdateTime, DateTime.UtcNow);
var options = new FindOneAndUpdateOptions<AccountEntry> { ReturnDocument = ReturnDocument.After };
var result = await _accountCollection.FindOneAndUpdateAsync(filter, update, options);
if (result == null)
{
return new TransResult { Code = TransCode.ConcurrentUpdateFailure };
}
return new TransResult { NewBalance = result.Balance };
}
public async Task RunAsync(Func<string, decimal, Task<TransResult>> func)
{
var tasks = new List<Task>();
string accountId = Guid.NewGuid().ToString();
var account = await CreateAccountAsync(accountId, 100m);
for (int i = 0; i < 5; i++)
{
var cur = i;
tasks.Add(Task.Run(async () =>
{
var r = await func(accountId, 60);
Console.WriteLine($"Task {cur}: code: {r.Code.ToString()}");
}));
}
await Task.WhenAll(tasks);
var updatedAccount = await _accountCollection.Find(x => x.Uuid == accountId).FirstOrDefaultAsync();
Console.WriteLine($"Account {accountId} Final Balance: {updatedAccount.Balance}");
}
}
}