在亚马逊 DocumentDB 中使用更改流 - Amazon DocumentDB
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在亚马逊 DocumentDB 中使用更改流

Amazon DocumentDB 中的更改流功能(兼容 MongoDB)提供了集群集合中发生的更改事件的时间顺序序列。您可以从变更流中读取事件来实现许多不同的用例,包括:

  • 更改通知

  • 使用亚马逊进行全文搜索 OpenSearch 服务 (OpenSearch 服务)

  • 使用亚马逊 Redshift 进行

应用程序可以使用变更流在各个集合中订阅数据变更。变更流事件在集群上发生时按顺序排列,并在记录事件之后存储 3 个小时(默认情况下)。使用以下方法可以将保留期延长至 7 天change_stream_log_retention_duration参数。要修改更改流保留期,请参阅修改更改流日志保留期限

支持的 操作

Amazon DocumentDB 支持以下更改流操作:

  • MongoDB 支持所有变更事件db.collection.watch()db.watch()client.watch()API。

  • 查找完整文档以获取更新。

  • 聚合阶段:$match$project$redact,以及$addFields$replaceRoot

  • 从简历令牌恢复更改流

  • 使用从时间戳恢复更改流startAtOperation(适用于亚马逊 DocumentDB v4.0+)

计费

默认情况下,Amazon DocumentDB 变更流功能处于禁用状态,并且在启用该功能之前不会产生任何额外费用。在集群中使用更改流会产生额外的读取和写入 IO 和存储成本。你可以使用modifyChangeStreams用于为您的集群启用此功能的 API 操作。有关定价的更多信息,请参阅亚马逊 DocumentD

限制

更改流在 Amazon DocumentDB 中有以下限制:

  • 只能通过与 Amazon DocumentDB 集群主实例的连接打开更改流。当前不支持从副本实例上的变更流中进行读取。在调用 watch() API 操作时,您必须指定 primary 读取首选项,以确保所有读取都定向到主实例(请参阅示例部分)。

  • 写入收藏变更流的事件最长可用 7 天(默认值为 3 小时)。变更流数据将在日志保留时段过后删除,即使没有发生新更改也是如此。

  • 对集合进行长时间运行的写入操作,例如updateMany要么deleteMany可以暂时停止更改流事件的写入,直到长时间运行的写入操作完成。

  • 亚马逊 DocumentDB 不支持 MongoDB 操作日志 (oplog)。

  • 使用 Amazon DocumentDB,您必须在给定集合上明确启用更改流。

  • 如果变更流事件的总大小(包括变更数据,在请求的情况下还包括完整文档)大于 16 MB,客户端将在变更流上遇到读取失败情况。

  • 使用时目前不支持 Ruby 驱动程序db.watch()client.watch()使用亚马逊 DocumentDB v3.6。

启用变更流

您可以为给定数据库中的所有馆藏启用 Amazon DocumentDB 更改流,也可以仅为选定的馆藏启用 Amazon Document 以下是如何使用 mongo shell 为不同用例启用更改流的示例。指定数据库和集合名称时,空字符串被视为通配符。

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

如果满足以下任意条件,则将为集合启用变更流:

  • 数据库和集合均已显式启用。

  • 包含该集合的数据库已启用。

  • 所有数据库均已启用。

如果父数据库也启用了更改流,或者集群中的所有数据库都已启用,则从数据库中删除集合不会禁用该集合的更改流。如果创建了与已删除收藏同名的新收藏集,则将为该集合启用更改流。

你可以使用列出集群中所有已启用的变更流$listChangeStreams聚合管道阶段。Amazon DocumentDB 支持的所有聚合阶段都可以在管道中用于额外处理。如果以前启用的某个集合被禁用,则该集合将不会显示在 $listChangeStreams 输出中。

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

示例:在 Python 中使用变更流

以下是在馆藏级别使用带有 Python 的亚马逊 DocumentDB 变更流的示例。

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-combined-ca-cn-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

以下是在数据库级别使用 Amazon DocumentDB 变更流和 Python 的示例。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-combined-ca-cn-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

完整文档查找

更新变更事件不包括完整文档;它只包括所做的更改。如果您的使用案例需要用到受更新影响的完整文档,则可以在打开流时启用完整文档查找。

更新变更流事件的 fullDocument 文档会指明文档查找时已更新文档的最新版本。如果在更新操作和更新操作之间发生了变化fullDocument查看,fullDocument文档可能无法代表更新时的文档状态。

#Create a stream object with update lookup enabled stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next() #Output: {'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

恢复变更流

您可以在以后通过使用恢复令牌来恢复变更流,该令牌相当于上次检索的变更事件文档的 _id 字段。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-combined-ca-cn-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

使用以下命令恢复变更流startAtOperationTime

您可以稍后使用以下命令从特定时间戳恢复更改流startAtOperationTime

注意

使用能力startAtOperationTime在亚马逊 DocumentDB 4.0+ 版本中可用。使用时startAtOperationTime,则更改流光标将仅返回在指定时间戳或之后发生的更改。的startAtOperationTimeresumeAfter命令是互斥的,因此不能一起使用。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

变更流中的交易

更改流事件将不包含来自未提交和/或已中止事务的事件。例如,如果您使用一个开始交易INSERT操作和一个UPDATE操作和。如果你的INSERT操作成功了,但是UPDATE操作失败,事务将被回滚。由于此交易已回滚,因此您的更改流将不包含此交易的任何事件。

修改更改流日志保留期限

您可以使用,将更改流日志的保留期限修改为 1 小时到 7 天之间Amazon Web Services Management Console或者Amazon CLI。

Using the Amazon Web Services Management Console
修改变更流日志保留期限的步骤
  1. 登录Amazon Web Services Management Console,然后打开亚马逊 DocumentDB 控制台https://console.aws.amazon.com/docdb

  2. 在导航窗格中,选择参数组

    提示

    如果您在屏幕左侧没有看到导航窗格,请在页面左上角选择菜单图标 ()。

  3. 参数组窗格中,选择与您的集群关联的集群参数组。要识别与您的集群关联的集群参数组,请参阅确定亚马逊 DocumentDB 集群的参数组

  4. 结果页面显示您的集群参数组的参数及其相应详细信息。选择 change_stream_log_retention_duration 参数。

  5. 在页面右上角,选择编辑以更改参数的值。的change_stream_log_retention_duration可以将参数修改为 1 小时到 7 天之间。

  6. 进行更改,然后选择修改集群参数保存更改。要放弃您的更改,请选择取消

Using the Amazon CLI

修改您的集群参数组change_stream_log_retention_duration参数,使用modify-db-cluster-parameter-group使用以下参数进行操作:

  • --db-cluster-parameter-group-name – 必需。您正在修改的集群参数组的名称。要识别与您的集群关联的集群参数组,请参阅确定亚马逊 DocumentDB 集群的参数组

  • --parameters – 必需。您正在修改的参数。每个参数条目必须包含以下内容:

    • ParameterName-您正在修改的参数的名称。在本例中,它是change_stream_log_retention_duration

    • ParameterValue— 此参数的新值。

    • ApplyMethod— 您希望如何应用对此参数的更改。允许的值为 immediatepending-reboot

      注意

      staticApplyType 参数必须具有 pending-rebootApplyMethod

  1. 要更改参数的值change_stream_log_retention_duration,运行以下命令并替换parameter-value使用您要将参数修改为的值。

    对于 Linux、macOS 或 Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    对于 Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    此操作的输出将类似于下文(JSON 格式)。

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 至少等待 5 分钟。

  3. 列出参数值sample-parameter-group以确保您的更改已完成。

    对于 Linux、macOS 或 Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    对于 Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    此操作的输出将类似于下文(JSON 格式)。

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
注意

更改流日志保留期不会删除早于配置的日志change_stream_log_retention_duration在日志大小大于 (>) 51,200MB 之前的值。