亚马逊 DynamoDB 直播作为 Pipes 的来源 EventBridge - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 DynamoDB 直播作为 Pipes 的来源 EventBridge

您可以使用 Pip EventBridge es 接收 DynamoDB 流中的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到目标进行处理。在设置管道时,您可以选择特定于 Amazon DynamoDB Streams 的设置。 EventBridge 当将数据发送到目标时,管道会保持数据流中记录的顺序。

重要

禁用作为管道源的 DynamoDB 流会导致该管道变得不可用,即使您随后重新启用流该管道也不再可用。这是因为:

  • 您无法停止、启动或更新源已禁用的管道。

  • 创建管道后,您无法使用新源更新此管道。当您重新启用 DynamoDB 流时,系统会为该流分配一个新的 Amazon 资源名称 ARN (),并且不再与您的管道关联。

如果您确实重新启用 DynamoDB 流,则需要使用该流的新管道创建新的管道。ARN

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon P EventBridge ipes 中的事件筛选

[ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable", "eventSource": "aws:dynamodb" } ]

轮询和批处理流

EventBridge 以每秒四次的基本速率轮询您的 DynamoDB 流中的分片以获取记录。当有记录可用时, EventBridge 处理事件并等待结果。如果处理成功,则 EventBridge 恢复轮询直到收到更多记录。

默认情况下,只要有记录,就会 EventBridge 调用你的管道。如果从源 EventBridge 读取的批次中只有一条记录,则只处理一个事件。为避免处理数量较少的记录,您可以配置批处理时段,让管道缓冲最多五分钟的记录。在处理事件之前,会 EventBridge 继续从源读取记录,直到它收集了完整批次、批处理窗口到期或批处理达到 6 MB 的有效载荷限制。

您还可以通过将来自各个分区的多个批次并行处理来增加并发性。 EventBridge 每个分片中最多可以同时处理 10 个批次。如果增加每个分片的并发批次数, EventBridge 仍可确保分区键级别的按顺序处理。

配置 ParallelizationFactor 设置,同时处理 Kinesis 或 DynamoDB 数据流一个分片中的多个管道执行。您可以通过从 1(默认)到 10 的并行化系数指定从分片 EventBridge 轮询的并发批次数。例如,当您设置为 ParallelizationFactor 2 时,最多可以有 200 个并发 EventBridge Pipe 执行来处理 100 个 Kinesis 数据分片。这有助于在数据量不稳定并且 IteratorAge 较高时纵向扩展处理吞吐量。请注意,如果使用 Kinesis 聚合,并行化因子将不起作用。

轮询和流的起始位置

请注意,管道创建和更新期间的流源轮询最终将是一致的。

  • 在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着,如果您指定 LATEST 作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON

报告批处理项目失败

在 EventBridge 使用和处理来自源的流数据时,默认情况下,它会检查批次的最高序号,但前提是批处理完全成功时。为避免重新处理失败批次中已成功处理的消息,您可以配置富集或目标,返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息,请参阅 部分批处理故障

成功和失败的条件

如果您返回以下任意一项,则将批次 EventBridge 视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果您返回以下任何内容,则会将批次 EventBridge 视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

EventBridge 根据您的重试策略重试失败。