Transactions - Amazon DocumentDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Transactions

Amazon DocumentDB(与 MongoDB 兼容)现在支持 MongoDB 4.0 兼容,包括事务。您可以跨多个文档、报表、集合和数据库执行事务处理。事务通过使您能够跨 Amazon DocumentDB 集群中的一个或多个文档执行原子、一致、隔离和持久的 (ACID) 操作,从而简化了应用程序开发。交易的常见使用案例包括财务处理、履行和管理订单以及构建多人游戏。

事务不会产生额外费用。您只需为作为交易一部分使用的读写 iOS 付费。

Requirements

要使用事务功能,您需要满足以下要求:

  • 您必须使用 Amazon DocumentDB 4.0 引擎。

  • 您必须使用与 MongoDB 4.0 或更高版本兼容的驱动程序。

最佳实践

以下是一些最佳实践,以便您能够充分利用 Amazon DocumentDB 的交易。

  • 始终在事务完成后提交或中止事务。使事务处于不完整状态会关联数据库资源,并可能导致写入冲突。

  • 建议将事务保持在所需的最少命令数量。如果您有多个语句的事务可以划分为多个较小的事务,建议这样做以减少超时的可能性。始终旨在创建短事务,而不是长时间运行的读取。

Limitations

  • Amazon DocumentDB 不支持交易中的游标。

  • Amazon DocumentDB 无法在事务中创建新的集合,也无法针对不存在的集合进行查询/更新。

  • 文档级写入锁定受到 1 分钟的超时限制,用户无法配置此操作。

  • 不支持可重试写入、可重试提交和可重试中止。

  • 每个 Amazon DocumentDB 实例对同时在实例上打开的并发事务数量都有上限。有关限制,请参阅。实例限制

  • 对于给定事务,事务日志大小必须小于 32MB。

  • Amazon DocumentDB 支持count(),但并非所有驱动程序都支持此功能。另一种方法是使用countDocuments()API,它将计数查询转换为客户端的聚合查询。

  • 事务的执行限制为 1 分钟,会话的超时时间为 30 分钟。如果事务超时,它将被中止,并且在会话中为现有事务发出的任何后续命令将产生以下错误:

    WriteCommandError({ "ok" : 0, "operationTime" : Timestamp(1603491424, 627726), "code" : 251, "errmsg" : "Given transaction number 0 does not match any in-progress transactions." })

监控和诊断

借助 Amazon DocumentDB 4.0 中对交易的支持,添加了其他 CloudWatch 指标,以帮助您监控您的交易。

新的 CloudWatch 指标

  • DatabaseTransactions:在一分钟期间执行的未结交易记录数。

  • DatabaseTransactionsAborted:在一分钟内执行的已中止事务处理的数量。

  • DatabaseTransactionsMax:一分钟内未结交易的最大数量。

  • TransactionsAborted:一分钟内在实例上中止的事务处理数。

  • TransactionsCommitted:一分钟内在实例上提交的事务处理数。

  • TransactionsOpen:在一分钟时间段内执行的实例上打开的事务处理数。

  • TransactionsOpenMax:一分钟内在实例上打开的最大事务数。

  • TransactionsStarted:一分钟内在实例上启动的事务处理数。

注意

有关 Amazon DocumentDB 的更多 CloudWatch 指标,请转到使用 CloudWatch 监控 Amazon DocumentDB dWatch

此外,新字段添加到currentOp lsidtransactionThreadId,以及一个新状态”idle transaction” 和serverStatus事务:currentActivecurrentInactivecurrentOpentotalAbortedtotalCommitted, 和totalStarted

事务隔离级别

启动事务时,您可以指定readConcernwriteConcern如以下示例所示:

mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

适用于readConcern,默认情况下,Amazon DocumentDB 支持快照隔离。如果readConcern指定了本地、可用或多数,则 Amazon DocumentDB 将升级readConcern级别到快照。Amazon DocumentDB 不支持可线性化readConcern并指定这样的读取问题将导致错误。

适用于writeConcern,Amazon DocumentDB 默认情况下支持多数,并且在三个可用区之间保留四个数据副本时实现写仲裁。如果较低writeConcern,Amazon DocumentDB 将升级writeConcern至多数. 此外,所有 Amazon DocumentDB 写入都会记录,并且不能禁用日志记录。

使用案例

在本节中,我们将介绍两个事务使用案例:多语句和多收集。

多语句事务

Amazon DocumentDB 事务是多语句,这意味着您可以通过显式提交或回退编写跨多个语句的事务。您可以将insertupdatedelete, 和findAndModify操作作为单个原子操作。

多结算交易的常见使用案例是借方-贷方交易。例如:你欠一个朋友钱的衣服。因此,您需要从您的账户中扣除(提取)500 美元,并将 500 美元(存款)存入您朋友的账户。要执行该操作,您可以在单个事务处理中同时执行负债和贷项操作,以确保原子性。这样做可以防止从您的账户中扣除 500 美元但未记入您朋友的账户的情况。以下是这个用例的样子:

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountColl.find(); // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; session.abortTransaction();

多收集事务处理

我们的交易也是多个集合,这意味着它们可以用于在单个交易中和跨多个集合执行多个操作。这样可以提供一致的数据视图,并维护数据的完整性。当您将命令作为单个<>,则事务是全部或全无执行-因为它们将全部成功或全部失败。

以下是多收集事务的示例,使用相同的方案和示例中的数据来处理多语句事务。

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var amountToTransfer = 500; var collectionName = "account"; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountCollInBankA.find(); // Alice holds $500 in bankA accountCollInBankB.find(); // Bob holds $1500 in bankB // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.abortTransaction(); accountCollInBankA.find(); // Alice holds $1000 in bankA accountCollInBankB.find(); // Bob holds $1000 in bankB

回调 API 的事务 API 示例

回调 API 仅适用于 4.2 以上的驱动程序。

Javascript

以下代码说明了如何在 Javascript 的情况下使用 Amazon DocumentDB 事务 API。

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
Node.js

以下代码说明了如何在 Node.js 的情况下使用 Amazon DocumentDB 事务 API。

// Node.js callback API: const bankDB = await mongoclient.db("bank"); var accountColl = await bankDB.createCollection("account"); var amountToTransfer = 500; const session = mongoclient.startSession({causalConsistency: false}); await accountColl.drop(); await accountColl.insertOne({name: "Alice", balance: 1000}, { session }); await accountColl.insertOne({name: "Bob", balance: 1000}, { session }); const transactionOptions = { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } }; // deduct $500 from Alice's account var aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(aliceBalance.balance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session }); await session.commitTransaction(); aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(newAliceBalance == aliceBalance.balance); // add $500 to Bob's account var bobBalance = await accountColl.findOne({name: "Bob"}, {session}); var newBobBalance = bobBalance.balance + amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session }); await session.commitTransaction(); bobBalance = await accountColl.findOne({name: "Bob"}, {session}); assert(newBobBalance == bobBalance.balance);
C#

以下代码说明了如何在 C# 中使用 Amazon DocumentDB 事务 API。

// C# Callback API var dbName = "bank"; var collName = "account"; var amountToTransfer = 500; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<BsonDocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); var result = session.WithTransaction( (sess, cancellationtoken) => { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"), Builders<BsonDocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"), Builders<BsonDocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); return "Transaction committed"; }, transactionOptions); // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); }
Ruby

以下代码说明了如何在 Ruby 中使用 Amazon DocumentDB 事务 API。

// Ruby Callback API dbName = "bank" collName = "account" amountToTransfer = 500 session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) # start transaction session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 from Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) session.end_session
Go

以下代码说明了如何在 Go 的情况下使用 Amazon DocumentDB 事务 API。

// Go - Callback API type Account struct { Name string Balance int } ctx := context.TODO() dbName := "bank" collName := "account" amountToTransfer := 500 session, err := client.StartSession(options.Session().SetCausalConsistency(false)) assert.NilError(t, err) defer session.EndSession(ctx) bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) accountColl.Drop(ctx) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000}) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000}) transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) _, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) { var result Account // deduct $500 from Alice's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) if err != nil { return nil, err } return "transaction committed", err }, transactionOptions) // check results outside of transaction var result Account err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result) aliceNewBalance := result.Balance err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobNewBalance := result.Balance assert.Equal(t, aliceNewBalance, 500) assert.Equal(t, bobNewBalance, 1500) // Go - Core API type Account struct { Name string Balance int } func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error { amountToTransfer := 500 transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) if err := sessionContext.StartTransaction(transactionOptions); err != nil { panic(err) } var result Account // deduct $500 from Alice's account err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) err = sessionContext.CommitTransaction(sessionContext) return err } func doTransactionWithRetry(t *testing.T) { ctx := context.TODO() dbName := "bank" collName := "account" bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error { accountColl.Drop(ctx) accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000}) accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000}) for { err := transferMoneyWithRetry(sessionContext, accountColl, t) if err == nil { println("transaction committed") return nil } if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") { continue } println("transaction failed") return err } }) // check results outside of transaction var result Account accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult) aliceBalance := result.Balance assert.Equal(t, aliceBalance, 500) accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance assert.Equal(t, bobBalance, 1500) }
Java

以下代码说明了如何在 Java 的情况下使用 Amazon DocumentDB 事务 API。

// Java (sync) - Callback API MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); int amountToTransfer = 500; // add sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.withTransaction(new TransactionBody<Void>() { @Override public Void execute() { // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); return null; } }, txnOptions); }
C

以下代码说明了如何在 C 中使用 Amazon DocumentDB 事务 API。

// Sample Code for C with Callback #include <bson.h> #include <mongoc.h> #include <stdio.h> #include <string.h> #include <assert.h> typedef struct { int64_t balance; bson_t *account; bson_t *opts; mongoc_collection_t *collection; } ctx_t; bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { bool r = true; ctx_t *data = (ctx_t *) ctx; bson_t local_reply; bson_t *selector = data->account; bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}"); mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error); *reply = bson_copy (&local_reply); bson_destroy (&local_reply); bson_destroy (update); return r; } void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; ctx_t alice_ctx; ctx_t bob_ctx; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // set variables which will be used by callback function alice_ctx.collection = collection; alice_ctx.opts = opts; alice_ctx.balance = new_alice_balance; alice_ctx.account = alice_query; // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; bob_ctx.collection = collection; bob_ctx.opts = opts; bob_ctx.balance = new_bob_balance; bob_ctx.account = bob_query; // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_callback_money_transfer(client, collection, amount_to_transfer); }
Python

以下代码说明了如何在 Python 中使用 Amazon DocumentDB 事务 API。

// Sample Python code with callback api import pymongo def callback(session, balance, query): collection.update_one(query, {'$set': {"balance": balance}}, session=session) client = pymongo.MongoClient(<connection uri>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance Sample Python code with Core api import pymongo client = pymongo.MongoClient(<connection_string>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session) session.commit_transaction() updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session) session.commit_transaction() updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance

核心 API 的事务 API 示例

Javascript

以下代码说明了如何在 Javascript 的情况下使用 Amazon DocumentDB 事务 API。

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
C#

以下代码说明了如何在 C# 中使用 Amazon DocumentDB 事务 API。

// C# Core API public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session) { var amountToTransfer = 500; // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); session.StartTransaction(transactionOptions); try { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"), Builders<bSondocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"), Builders<bSondocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); } catch (Exception e) { session.AbortTransaction(); throw; } session.CommitTransaction(); } } public void DoTransactionWithRetry(MongoClient client) { var dbName = "bank"; var collName = "account"; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { try { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<bSondocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); while(true) { try { TransferMoneyWithRetry(accountColl, session); break; } catch (MongoException e) { if(e.HasErrorLabel("TransientTransactionError")) { continue; } else { throw; } } } // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); } catch (Exception e) { Console.WriteLine("Error running transaction: " + e.Message); } } }
Ruby

以下代码说明了如何在 Ruby 中使用 Amazon DocumentDB 事务 API。

# Ruby Core API def transfer_money_w_retry(session, accountColl) amountToTransfer = 500 session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 to Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) session.commit_transaction end def do_txn_w_retry(client) dbName = "bank" collName = "account" session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) begin transferMoneyWithRetry(session, accountColl) puts "transaction committed" rescue Mongo::Error => e if e.label?('TransientTransactionError') retry else puts "transaction failed" raise end end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) end
Java

以下代码说明了如何在 Java 的情况下使用 Amazon DocumentDB 事务 API。

// Java (sync) - Core API public void transferMoneyWithRetry() { // connect to server MongoClientURI mongoURI = new MongoClientURI(uri); MongoClient mongoClient = new MongoClient(mongoURI); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); // insert some sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.startTransaction(txnOptions); // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); Assert.assertTrue(aliceBalance >= amountToTransfer); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit transaction clientSession.commitTransaction(); } } // Java (async) -- Core API public void transferMoneyWithRetry() { // connect to the server MongoClient mongoClient = MongoClients.create(uri); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>(); mongoClient.getDatabase("bank").drop().subscribe(dropCallback); dropCallback.await(); // insert some sample data SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback); insertionCallback.await(); insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);; insertionCallback.await(); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; // start the transaction TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>(); mongoClient.startSession(sessionOptions).subscribe(sessionCallback); ClientSession session = sessionCallback.get().get(0); session.startTransaction(txnOptions); // deduct $500 from Alice's account SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); Document documentFound = findCallback.get().get(0); int aliceBalance = (int) documentFound.get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback); updateCallback.await(); // check Alice's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int bobBalance = (int) documentFound.get("balance"); int newBobBalance = bobBalance + amountToTransfer; updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback); updateCallback.await(); // check Bob's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit the transaction SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>(); session.commitTransaction().subscribe(transactionCallback); transactionCallback.await(); } public class SubscriberLatchWrapper<T> implements Subscriber<T> { /** * A Subscriber that stores the publishers results and provides a latch so can block on completion. * * @param <T> The publishers result type */ private final List<T> received; private final List<RuntimeException> errors; private final CountDownLatch latch; private volatile Subscription subscription; private volatile boolean completed; /** * Construct an instance */ public SubscriberLatchWrapper() { this.received = new ArrayList<>(); this.errors = new ArrayList<>(); this.latch = new CountDownLatch(1); } @Override public void onSubscribe(final Subscription s) { subscription = s; subscription.request(Integer.MAX_VALUE); } @Override public void onNext(final T t) { received.add(t); } @Override public void onError(final Throwable t) { if (t instanceof RuntimeException) { errors.add((RuntimeException) t); } else { errors.add(new RuntimeException("Unexpected exception", t)); } onComplete(); } @Override public void onComplete() { completed = true; subscription.cancel(); latch.countDown(); } /** * Get received elements * * @return the list of received elements */ public List<T> getReceived() { return received; } /** * Get received elements. * * @return the list of receive elements */ public List<T> get() { return await().getReceived(); } /** * Await completion or error * * @return this */ public SubscriberLatchWrapper<T> await() { subscription.request(Integer.MAX_VALUE); try { if (!latch.await(300, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds"); } } catch (InterruptedException e) { throw new MongoInterruptedException("Interrupted waiting for observeration", e); } if (!errors.isEmpty()) { throw errors.get(0); } return this; } public boolean getCompleted() { return this.completed; } public void close() { subscription.cancel(); received.clear(); } }
C

以下代码说明了如何在 C 中使用 Amazon DocumentDB 事务 API。

// Sample C code with core session bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){ bool r = true; bson_error_t error; bson_t *opts = bson_new(); bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}"); // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); mongoc_client_session_start_transaction (client_session, txn_opts, &error); mongoc_client_session_append (client_session, opts, &error); r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error); mongoc_client_session_commit_transaction (client_session, NULL, &error); bson_destroy (opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); bson_destroy (update); return r; } void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // core r = core_session (client_session, collection, alice_query, new_alice_balance); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to Bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; //core r = core_session (client_session, collection, bob_query, new_bob_balance); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_core_money_transfer(client, collection, amount_to_transfer); }
Scala

以下代码说明了如何在 Scala 的情况下使用 Amazon DocumentDB 事务 API。

// Scala Core API def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = { val accountColl = database.getCollection("account") var amountToTransfer = 500 var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => { clientSession.startTransaction() // deduct $500 from Alice's account var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance >= amountToTransfer) var newAliceBalance = aliceBalance - amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await() aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance == newAliceBalance) // add $500 to Bob's account var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") var newBobBalance = bobBalance + amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await() bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") assert(bobBalance == newBobBalance) clientSession }) transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await() } def doTransactionWithRetry(): Unit = { val client: MongoClient = MongoClientWrapper.getMongoClient() val database: MongoDatabase = client.getDatabase("bank") val accountColl = database.getCollection("account") accountColl.drop().await() val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build() var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions) accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await() accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await() var retry = true while (retry) { try { transferMoneyWithRetry(sessionObservable, database) println("transaction committed") retry = false } catch { case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("retrying transaction") } case other: Throwable => { println("transaction failed") retry = false throw other } } } // check results outside of transaction assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500) assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500) accountColl.drop().await() }

支持的 命令

命令 支持

abortTransaction

commitTransaction

endSessions

killSession

killAllSession

killAllSessionsByPattern

refreshSessions

startSession

不支持的功能

Methods 阶段或命令

db.collection.aggregate()

$collStats

$currentOp

$indexStats

$listSessions

$out

db.collection.count()

db.collection.countDocuments()

$where

$near

$nearSphere

db.collection.insert()

insert如果它不针对现有集合运行,则不支持。如果此方法以预先存在的集合为目标,则支持此方法。

Sessions

MongoDB 会话是一个框架,用于支持可重试写入、因果一致性、事务以及跨数据库管理操作。创建会话时,客户端将生成逻辑会话标识符 (lsid),用于在向服务器发送命令时标记该会话中的所有操作。

Amazon DocumentDB 支持使用会话来启用事务,但不支持因果一致性或可重试写入。

在 Amazon DocumentDB 中使用事务时,将在会话中使用session.startTransaction()API 和会话一次支持单个事务。同样,事务也可以使用提交(session.commitTransaction())或中止(session.abortTransaction()) API。

因果一致性

因果一致性保证在单个客户端会话中,客户端将遵守写后读取一致性、单原子读/写操作,并且写入将跟随读取,这些保证适用于集群中的所有实例,而不仅仅是主实例。Amazon DocumentDB 不支持因果一致性,以下语句将导致错误。

var mySession = db.getMongo().startSession(); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //Error: error: { // "ok" : 0, // "code" : 303, // "errmsg" : "Feature not supported: 'causal consistency'", // "operationTime" : Timestamp(1603461817, 493214) //} mySession.endSession()

您可以禁用会话中的因果一致性。请注意,这样做将使您能够利用会话框架,但不会为读取提供因果一致性保证。使用 Amazon DocumentDB 时,从主数据库的读取将保持写后读取一致性,并且从副本实例读取最终保持一致性。事务是利用会话的主要用例。

var mySession = db.getMongo().startSession({causalConsistency: false}); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //{ "_id" : 1, "name" : "Bob", "balance" : 100 } //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

可重试写入

可重试写入是一种功能,在发生网络错误或客户端无法找到主操作时,客户端将尝试重试写入操作一次。在 Amazon DocumentDB 中,不支持可重试写入,必须禁用。您可以使用命令(retryWrites=false) 连接字符串。下面是示例:

mongodb://chimera:<insertYourPassword>@docdb-2019-01-29-02-57-28.cluster-ccuszbx3pn5e.us-east-1.docdb.amazonaws.com:27017/?ssl=true&ssl_ca_certs=rds-combined-ca-cn-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false

事务错误

使用事务时,有些情况可能会发出一个错误,指出事务编号与任何正在进行的事务不匹配。

该错误可以在至少两种不同的情况下生成:

  • After the one-minute transaction timeout.
  • After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can't tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.

处理此错误的最佳方法是使事务性更新幂等-例如,通过使用$set变异器而不是增量/递减操作。请参阅下面的:

{ "ok" : 0, "operationTime" : Timestamp(1603938167, 1), "code" : 251, "errmsg" : "Given transaction number 1 does not match any in-progress transactions." }