Amazon Kinesis 流作为源 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon Kinesis 流作为源

您可以使用 EventBridge Pipes 接收 Kinesis 数据流中的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,可以选择特定于 Kinesis 的设置。将数据发送到目标时,EventBridge Pipes 会保持数据流中记录的顺序。

Kinesis 数据流是一组分区。每个分片包含一系列数据记录。使用者 是一种处理 Kinesis 数据流中的数据的应用程序。您可以将 EventBridge Pipe 映射到共享吞吐量使用者(标准迭代器)或具有增强扇出功能的专用吞吐量使用者。

对于标准迭代器,EventBridge 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。管道与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,您可以创建具有增强扇出功能的数据流使用者。流使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据,或者您正在重新处理具有大记录的流,则专用吞吐量可以提供帮助。Kinesis 通过 HTTP/2 向 EventBridge 推送记录。有关 Kinesis 数据流的信息,请参阅读取 Amazon Kinesis Data Streams 中的数据

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 亚马逊 EventBridge 管道过滤

[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]

轮询和批处理流

EventBridge 以每秒四次的基本频率轮询 Kinesis 流中的分片来获取记录。如果有可用记录,EventBridge 会处理事件并等待结果。如果处理成功,EventBridge 将恢复轮询,直到收到更多记录。

默认情况下,EventBridge 会在记录可用时立即调用您的管道。如果 EventBridge 从源中读取的批次只有一条记录,则只会处理一个事件。为避免处理数量较少的记录,您可以配置批处理时段,让管道缓冲最多五分钟的记录。处理事件前,EventBridge 会持续从源中读取记录,直到收集完整批次、批处理时段到期或批次达到 6MB 的有效负载时为止。

您还可以通过将来自各个分区的多个批次并行处理来增加并发性。EventBridge 可以同时处理每个分片中的多达 10 个批次。如果您增加每个分片的并发批次数,EventBridge 仍然可以确保分区密钥级别的顺序处理。

配置 ParallelizationFactor 设置,同时处理 Kinesis 或 DynamoDB 数据流一个分片中的多个管道执行。您可以指定 EventBridge 通过从 1(默认值)到 10 的并行化因子从分片中轮询的并发批次数。例如,假设您将 ParallelizationFactor 设置为 2,则最多可以有 200 个并发 EventBridge 管道执行来处理 100 个 Kinesis 数据分片。这有助于在数据量不稳定并且 IteratorAge 较高时纵向扩展处理吞吐量。请注意,如果使用 Kinesis 聚合,并行化因子将不起作用。

轮询和流的起始位置

请注意,管道创建和更新期间的流源轮询最终将是一致的。

  • 在管道创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在管道更新源轮询配置期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

这意味着,如果您指定 LATEST 作为流的起始位置,在创建或更新管道期间,管道可能会错过发送的事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZONAT_TIMESTAMP

报告批处理项目失败

在 EventBridge 使用和处理来自源的流式数据时,默认情况下,仅在批处理完全成功时,才会在批次的最高序列号处设置检查点。为避免重新处理失败批次中已成功处理的消息,您可以配置富集或目标,返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息,请参阅 部分批处理故障

成功和失败的条件

如果返回以下任意一项,EventBridge 会将此批次视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果返回以下任意一项,EventBridge 会将此批次视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

EventBridge 会根据您的重试策略在失败时重试。