别在mongos前做负载均衡

在一个分片MongoDB集群上并发执行transaction时遇到许多MongoCommandException错误: code 251, codename NoSuchTransaction:

Command find failed: cannot continue txnId 4 for session 38604515-2584-45a5-a17a-5eb5d34ea6c4 - = with txnId 5. Command find failed: cannot continue txnId 4 for session 38604515-2584-45a5-a17a-5eb5d34ea6c4 - = with txnId 6. Command insert failed: cannot continue txnId 31 for session 3ed7ea61-eae1-440f-8d95-b6e066b35b69 - = with txnId 34.

问题分析

做了一些实验:

  • 单线程执行transaction,错误消失
  • 多线程中的每个线程分配一个独立的MongoClient,错误消失

看起来像是MongoDB driver的问题,在并发执行transaction时MongoClient产生了冲突,不可复用同一个。而这又与官方文档建议在程序生命周期中复用一个MongoClient相悖,于是就报了个issue: # Multi-thread Transaction Failure for Sharded Cluster.

后来发现元凶在于分片服务器的部署,我们在两个mongos实例之前加了一个负载均衡器,一个transaction中的几个操作可能在两个不同的mongos上执行从而出现了上述错误。# Transactions issue on sharded cluster

可以参考 Mongos Pinning:

Drivers MUST send all commands for a single transaction to the same mongos (excluding retries of commitTransaction and abortTransaction).

After the driver selects a mongos for the first command within a transaction, the driver MUST pin the ClientSession to the selected mongos. Drivers MUST send all subsequent commands that are part of the same transaction (excluding certain retries of commitTransaction and abortTransaction) to the same mongos.

单独暴露每个Mongos实例

一个transaction必须在同一个mongos上执行,因此每个mongos实例都需要暴露给MongoClient进行选择。部署Cluster时需要将每个mongos都暴露到公网,现在终于理解 官方文档 为什么要使用nodeport而不是service来暴露mongos的原因了。

示例mongos-service.yaml,为两个mongos实例创建了两个独立的服务,在spec.selector中显示指定了要选择的pod的名称:

apiVersion: v1
kind: Service
metadata:
  name: mongos-svc-0
  namespace: mongodb
  labels:
    app: sharddb-svc
    controller: mongodb-enterprise-operator
spec:
  type: LoadBalancer
  ports:
    - protocol: TCP
      port: 27017
      targetPort: 27017
  selector:
    app: sharddb-svc
    controller: mongodb-enterprise-operator
    statefulset.kubernetes.io/pod-name: sharddb-mongos-0

---
apiVersion: v1
kind: Service
metadata:
  name: mongos-svc-1
  namespace: mongodb
  labels:
    app: sharddb-svc
    controller: mongodb-enterprise-operator
spec:
  type: LoadBalancer
  ports:
    - protocol: TCP
      port: 27017
      targetPort: 27017
  selector:
    app: sharddb-svc
    controller: mongodb-enterprise-operator
    statefulset.kubernetes.io/pod-name: sharddb-mongos-1
参考: # MongoDB Ops Manager部署Sharded Cluster

错误复现代码

public class TransactionTest
{
    private const string DatabaseName = "Test";
    private const string CollectionName = "Test";
    public const string ConnectionString = "";
    public MongoClient GetMongoClient(int timeout = 5)
    {
        var clientSettings = MongoClientSettings.FromConnectionString(ConnectionString);
        clientSettings.ConnectTimeout = TimeSpan.FromSeconds(5);
        clientSettings.ServerSelectionTimeout = TimeSpan.FromSeconds(timeout);
        clientSettings.AllowInsecureTls = true;
        var mongoClient = new MongoClient(clientSettings);
        return mongoClient;
    }
 
    public async Task TestTransactionAsync()
    {
        var client = GetMongoClient();
        var tasks = new List<Task>();
        for (int i = 0; i < 5; ++i)
        {
            //var client = GetMongoClient(i + 5);
            tasks.Add(DoAsync(client));
        }
        await Task.WhenAll(tasks);
    }
 
    private async Task DoAsync(IMongoClient mongoClient)
    {
        Console.WriteLine("Client hashcode: " + mongoClient.GetHashCode());
        var collection = mongoClient.GetDatabase(DatabaseName).GetCollection<BsonDocument>(CollectionName);
 
        while (true)
        {
            var uuid1 = Guid.NewGuid().ToString("N").Substring(24);
            var uuid2 = Guid.NewGuid().ToString("N").Substring(24);
            try
            {
                using (var session = await mongoClient.StartSessionAsync())
                {
                    session.StartTransaction();
                    await collection.InsertOneAsync(session, new BsonDocument("Uuid", uuid1));
                    await collection.InsertOneAsync(session, new BsonDocument("Uuid", uuid2));
 
                    await session.CommitTransactionAsync();
                }
                Console.WriteLine($"[{uuid1}] [{uuid2}]");
            }
            catch (Exception e)
            {
                Console.WriteLine("$$$ " + e.Message);
            }
        }
    }
}