亚马逊 EventBridge 管道筛选 - 亚马逊 EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 EventBridge 管道筛选

使用 EventBridge Pipes,您可以过滤给定源的事件并仅处理其中的一部分。这种筛选的工作方式与使用事件模式对 EventBridge 事件总线或 Lambda 事件源映射进行筛选的方式相同。更多有关事件模式的信息,请参阅Amazon EventBridge 事件模式

筛选条件FilterCriteria对象是一个由筛选条件列表组成的结构。Filters每个筛选条件是一个用于定义筛选模式的结构 (Pattern)。Pattern 是 JSON 筛选条件规则的字符串表示。FilterCriteria 对象与以下示例类似:

{ "Filters": [ {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "Metadata1": [ pattern1 ], "data": {"Data1": [ pattern2 ]} }

FilterCriteria对象的主要部分是元数据属性和数据属性。

  • 元数据属性是指事件对象的字段。在示例中,FilterCriteria.Metadata1表示一个元数据属性。

  • 数据属性是指事件主体的字段。在示例中,FilterCriteria.Data1表示一个数据属性。

例如,假设你的 Kinesis 直播包含这样的事件:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": {"City": "Seattle", "State": "WA", "Temperature": "46", "Month": "December" }, "approximateArrivalTimestamp": 1545084650.987 }

当事件流经您的管道时,它将如下所示,data字段采用 base64 编码:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" },

Kinesis 事件的元数据属性data对象以外的任何字段,例如partitionKeysequenceNumber

Kinesis 事件的数据属性data对象内部的字段,例如CityTemperature

当您进行筛选以匹配此事件时,可以在解码后的字段上使用过滤器。例如,要进行筛选partitionKeyCity您可以使用以下过滤器:

{ "partitionKey": [ "1" ], "data": { "City": [ "Seattle" ] }

当你创建事件过滤器时, EventBridge Pipes 可以访问事件内容。此内容要么采用 JSON 转义(例如 Amazon SQSbody 字段),要么采用 base64 编码,例如 Kinesisdata 字段。如果您的数据是有效的 JSON,则您的输入模板或目标参数的 JSON 路径可以直接引用内容。例如,如果 Kinesis 事件源是有效的 JSON,则可以使用引用变量<$.data.someKey>

创建事件模式时,您可以根据源 API 发送的字段进行筛选,而不是根据轮询操作添加的字段进行筛选。以下字段不能在事件模式中使用:

  • awsRegion

  • eventSource

  • eventSourceARN

  • eventVersion

  • eventID

  • eventName

  • invokeIdentityArn

  • eventSourceKey

消息和数据字段

每个 EventBridge Pipe 源都包含一个包含核心消息或数据的字段。我们将它们称为消息字段或数据字段。这些字段很特别,因为它们可能采用 JSON 转义或 base64 编码,但是当它们是有效的 JSON 时,可以使用 JSON 模式对其进行过滤,就好像正文没有被转义一样。这些字段的内容也可以在输入变压器中无缝使用。

正确筛选 Amazon SQS 消息

如果 Amazon SQS 消息不符合您的筛选条件, EventBridge 会自动从队列中删除该消息。您无需在 Amazon SQS 中手动删除这些消息。

对于 Amazon SQS,消息 body 可以是任何字符串。但如果您的 FilterCriteria 期望 body 为有效的 JSON 格式,则可能会导致问题。反之亦然 — 如果传入的消息body为有效的 JSON 格式,但您的筛选条件期望body为纯字符串,则会导致出现意外行为。

要避免此问题,请确保您的格式与您从队列body中收到的 in 消息的期望格式FilterCriteria一致。body在筛选消息之前, EventBridge 会自动评估传入消息的格式body以及 body 的格式body。如果不匹配,将会 EventBridge 丢弃此消息。下表汇总了此评估:

传入消息 body 格式 筛选条件模式 body 格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选条件进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

纯字符串

有效 JSON

EventBridge 将丢弃消息。

有效 JSON

纯字符串

EventBridge 将丢弃消息。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

如果您未body将其包括在内FilterCriteria,请 EventBridge 跳过此检查。

正确筛选 Kinesis 和 DynamoDB 消息

在筛选条件处理了 Kinesis 或 DynamoDB 记录后,流迭代器会跳过此记录。如果记录不符合筛选条件,您无需从事件源手动删除记录。保留期结束后,Kinesis 和 DynamoDB 会自动删除这些旧记录。如果您希望提前删除记录,请参阅更改数据保留期

要正确筛选流事件源中的事件,数据字段和数据字段的筛选条件都必须为有效的 JSON 格式。(对于 Kinesis,数据字段为 data。对于 DynamoDB,数据字段为 dynamodb。) 如果任一字段不为有效的 JSON 格式, EventBridge 将会丢弃消息或引发异常。下表汇总了具体行为:

传入数据格式(datadynamodb 数据属性中的筛选条件模式格式 导致的操作

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

非 JSON

EventBridge 在管道或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

非 JSON

有效 JSON

EventBridge 将丢弃记录。

非 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

非 JSON

非 JSON

EventBridge 在管道创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

正确筛选 Amazon Managed Streaming for Apache Kafka 自行管理的 Amazon Managed 和 Amazon MQ 消息。

注意

将筛选条件附加到带有 Amazon MQ 事件源的管道后,筛选规则应用到事件最多可能需要 15 分钟。

对于 Amazon MQ 来源,消息字段为 data。对于 A mazon MSK自行管理的 Amazon MSK,有两个消息字段:keyvalue

EventBridge 丢弃与筛选条件中包含的所有字段不匹配的消息。对于 Apache Kafka,在成功调用函数后为匹配和不匹配的消息 EventBridge 提交偏移。对于 Amazon MQ,在成功调用函数后 EventBridge 确认匹配的消息,并在筛选不匹配的消息时确认此类消息。

Amazon Managed 和 Amazon MQ 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为在应用筛选条件之前将 Amazon Manag EventBridge ed 和 Amazon MQ 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与格式不匹配,则仅 EventBridge 处理元数据筛选条件。FilterCriteria下表汇总了具体行为:

传入消息格式(datakeyvalue 消息属性的筛选条件模式格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选条件进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

纯字符串

有效 JSON

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

纯字符串

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

非 UTF-8 编码字符串

JSON、纯字符串或无模式

EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。

Lambda ESM 和 EventBridge 管道之间的区别

筛选事件时,Lambda ESM 和 Pip EventBridge es 的运行方式通常相同。主要区别在于 ESM 有效载荷中不存在磁eventSourceKey场。