

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon DocumentDB 和 Java 进行事件驱动型编程
事件驱动型编程

Amazon DocumentDB 环境中的事件驱动型编程代表了一种强大的架构模式，其中数据库变更充当主要事件生成器，用于触发后续业务逻辑和进程。在 DocumentDB 集合中插入、更新或删除记录时，这些变更充当事件，用于自动启动各种下游进程、通知或数据同步任务。该模式在现代分布式系统中尤为重要，在这些系统中，多个应用程序或服务需要对数据变更做出实时反应。在 DocumentDB 中实现事件驱动型编程的主要机制是通过变更流。

**注意**  
本指南假设您已在使用的集合上启用了变更流。要了解如何在集合上启用变更流，请参阅 [将变更流与 Amazon DocumentDB 结合使用](change_streams.md)。

**通过 Java 应用程序处理变更流**

MongoDB 的 Java 驱动程序中的 `watch()` 方法是监控 Amazon DocumentDB 中实时数据变更的主要机制。`watch()` 方法可以通过 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html)、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#watch(com.mongodb.client.ClientSession,java.lang.Class)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#watch(com.mongodb.client.ClientSession,java.lang.Class)) 和 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#watch()](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#watch()) 对象进行调用。

`watch()` 方法将返回支持各种配置选项的 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) 的实例，包括用于更新的完整文档查找、提供恢复令牌和时间戳以确保可靠性，以及用于筛选变更的管道聚合阶段。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) 将实现核心 Java 接口 `Iterable`，可以与 `forEach()` 结合使用。要使用 `forEach()` 捕获事件，请将回调函数传递给处理变更事件的 `forEach()`。以下代码片段显示了如何在集合上打开变更流以启动变更事件监控：

```
ChangeStreamIterable < Document > iterator = collection.watch();
iterator.forEach(event - > {
    System.out.println("Received a change: " + event);
});
```

遍历所有变更事件的另一种方法是打开光标，该光标保持与集群的连接，并在发生新的变更事件时持续接收这些事件。要获取变更流光标，请使用 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) 对象的 `cursor()` 方法。以下代码示例显示了如何使用光标监控变更事件：

```
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) {
    System.out.println(cursor.tryNext());
}
```

最佳做法是，要么在 try-with-resource语句[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html#getResumeToken()](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html#getResumeToken())中创建，要么手动关闭游标。在 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) 上调用 `cursor()` 方法将返回通过 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) 对象创建的 `MongoChangeStreamCursor`。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) 类是表示流中各个变更事件的关键组件。包含有关每项修改的详细信息，包括操作类型（插入、更新、删除、替换）、文档键、命名空间信息以及完整的文档内容（如果有）。该类提供了访问变更事件各个方面的方法，例如 `getOperationType()` 用于确定变更类型，`getFullDocument()` 用于访问完整文档状态，以及 `getDocumentKey()` 用于识别被修改的文档。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) 对象提供两条重要信息，即恢复令牌和变更事件发生时间。

DocumentDB 变更流中的恢复令牌和基于时间的操作为保持连续性和管理历史数据访问提供了关键机制。恢复令牌是为每个变更事件生成的唯一标识符，用作书签，允许应用程序在断开连接或发生故障后从特定点重新启动变更流处理。创建变更流光标时，可以通过 `resumeAfter()` 选项使用先前存储的恢复令牌，使流能够从中断处继续，而非从头开始或丢失事件。

变更流中基于时间的操作提供了不同的方法来管理变更事件监控的起点。`startAtOperationTime()` 选项允许您开始监视在特定时间戳或之后发生的变更。这些基于时间的功能在需要历史数据处理、 point-in-time恢复或系统间同步的场景中特别有价值。

以下代码示例检索与插入文档关联的事件，捕获其恢复令牌，然后提供该令牌以开始监控插入事件之后的事件。该事件与更新事件关联，然后获取更新发生时的集群时间，并使用该时间戳作为进一步处理的起点。

```
BsonDocument resumeToken;
BsonTimestamp resumeTime;

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) {
    System.out.println("****************** Insert Document *******************");
    ChangeStreamDocument < Document > insertChange = cursor.tryNext();
    resumeToken = insertChange.getResumeToken();
    printJson(cursor.tryNext());
}
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch()
    .resumeAfter(resumeToken)
    .cursor()) {
    System.out.println("****************** Update Document *******************");
    ChangeStreamDocument < Document > insertChange = cursor.tryNext();
    resumeTime = insertChange.getClusterTime();
    printJson(cursor.tryNext());
}
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch()
    .startAtOperationTime(resumeTime)
    .cursor()) {
    System.out.println("****************** Delete Document *******************");
    printJson(cursor.tryNext());
  }
```

默认情况下，更新变更事件不包含完整文档，仅包含所做变更。如果您需要访问已更新的完整文档，则可以在 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) 对象上调用 `fullDocument()` 方法。请记住，当您请求返回更新事件的完整文档时，会返回调用变更流时存在的文档。

此方法采用 [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/FullDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/FullDocument.html) 枚举作为参数。目前，Amazon DocumentDB 仅支持 DEFAULT 和 `UPDATE_LOOKUP` 值。以下代码片段显示了在开始监视变更时如何请求提供更新事件的完整文档：

```
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())
```