将 Amazon Simple Queue Service 定义为 EventBridge 管道的源 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

将 Amazon Simple Queue Service 定义为 EventBridge 管道的源

您可使用 EventBridge Pipes 接收来自 Amazon SQS 队列的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标进行处理。

您可以使用管道处理 Amazon Simple Queue Service (Amazon SQS) 队列中的消息。EventBridge Pipes 支持标准队列先进先出 (FIFO) 队列。在 Amazon SQS 中,您可以通过将来自一个应用程序组件的任务发送到一个队列中并异步处理它们来进行分载。

EventBridge 轮询队列并同步调用您的管道,其中有包含队列消息的事件。EventBridge 按批次读取消息,并为每个批次调用一次管道。当您的管道成功处理一个批次后,EventBridge 就会将其消息从队列中删除。

默认情况下,EventBridge 将同时轮询队列中的最多 10 条消息,并将该批次发送到管道。为避免在记录数量较少的情况下调用管道,您可以配置批处理时间窗,让事件源缓冲最多 5 分钟的记录。在调用管道之前,EventBridge 会继续轮询来自 Amazon SQS 标准队列的消息,直到出现以下情况之一:

  • 批处理时间窗过期。

  • 已达到调用负载大小配额。

  • 已达到配置的批次大小上限。

注意

如果您使用批处理时间窗,并且 Amazon SQS 队列包含低流量,EventBridge 可能会等待最多 20 秒钟,然后再调用您的管道。即使您将批处理时间窗设置为少于 20 秒,情况依然如此。对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

当 EventBridge 读取批次时,消息将保留在队列中,但会在队列可见性超时的长度内隐藏。如果您的管道成功处理一个批次,EventBridge 会将消息从队列中删除。默认情况下,如果您的管道在处理某个批次时遇到错误,则该批次中的所有消息都会在队列中重新可见。因此,管道代码必须能够多次处理同一条消息,而不会产生意外的副作用。您可以在管道响应中包括批处理项目失败次数,来修改此再处理行为。以下示例显示了包含两条消息的批次事件。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 Amazon EventBridge 管道中的事件筛选

标准队列

[ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ]

FIFO 队列

[ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ]

扩展和处理

对于标准队列,EventBridge 使用长轮询来轮询一个队列,直到它变为活动状态。当消息可用时,EventBridge 最多可读取五个批次并将其发送到您的管道。如果仍有可用消息,EventBridge 会增加读取批次的进程数,最多每分钟增加 300 个实例。管道可以同时处理的最大批次数量为 1,000。

对于 FIFO 队列,EventBridge 按照接收消息的顺序向您的管道发送消息。向 FIFO 队列发送消息时,需要指定消息组 ID。Amazon SQS 有助于按顺序将同一组消息传送到 EventBridge。EventBridge 将消息分组,每次只为一组发送一个批次。如果管道返回错误,管道会对受影响的消息尝试所有重试次数,然后 EventBridge 才会接收来自同一组的其他消息。

配置队列,与 EventBridge Pipes 配合使用

创建一个 Amazon SQS 队列,用作管道的源。然后配置此队列,使您的管道有时间处理每批事件,并使 EventBridge 在扩展时出现节流错误时能够重试。

为使您的管道有时间处理每批记录,请将源队列的可见性超时至少设置为管道富集和目标组件合并运行时的六倍。在您的管道处理上一批次期间遇到限流时,这段额外的时间可供 EventBridge 进行重试。

如果您的管道多次都未能处理某条消息,Amazon SQS 可以将其发送到某个死信队列。当您的管道返回错误时,EventBridge 会将其留在队列中。发生可见性超时之后,EventBridge 会再次接收此消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在管道上配置。您在管道上配置的死信队列用于管道的异步调用队列,而不是用于源队列。

如果您的管道返回错误,或者由于达到并发上限而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更多处理机会,请将源队列重新驱动策略的 maxReceiveCount 至少设置为 5

报告批处理项目失败

在 EventBridge 使用和处理来自源的流式数据时,默认情况下,仅在批处理完全成功时,才会在批次的最高序列号处设置检查点。为避免重新处理失败批次中已成功处理的消息,您可以配置富集或目标,返回对象来指示哪些消息处理成功、哪些失败。这称为部分批处理响应。

有关更多信息,请参阅 部分批处理故障

成功和失败的条件

如果返回以下任意一项,EventBridge 会将此批次视为完全成功:

  • 空的 batchItemFailure 列表

  • Null batchItemFailure 列表

  • 空的 EventResponse

  • Null EventResponse

如果返回以下任意一项,EventBridge 会将此批次视为完全失败:

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

EventBridge 会根据您的重试策略在失败时重试。