亚马逊 EventBridge 管道过滤 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 EventBridge 管道过滤

使用 Pip EventBridge es,您可以过滤给定源的事件并仅处理其中的一部分。这种筛选的工作方式与使用事件模式对 EventBridge 事件总线或 Lambda 事件源映射进行筛选的方式相同。有关事件模式的更多信息,请参阅 亚马逊 EventBridge 事件模式

筛选标准 FilterCriteria 对象是由筛选条件 Filters 列表组成的结构。每个筛选条件都是用于定义筛选模式 (Pattern) 的结构。Pattern 是 JSON 筛选条件规则的字符串表示。FilterCriteria 对象与以下示例类似:

{ "Filters": [ {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }

为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern 的值。

{ "Metadata1": [ pattern1 ], "data": {"Data1": [ pattern2 ]} }

FilterCriteria 对象的主要部分是元数据属性和数据属性。

  • 元数据属性是指事件对象的字段。在示例中,FilterCriteria.Metadata1 表示元数据属性。

  • 数据属性是指事件主体的字段。在示例中,FilterCriteria.Data1 表示数据属性。

例如,假设您的 Kinesis 流包含这样的事件:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": {"City": "Seattle", "State": "WA", "Temperature": "46", "Month": "December" }, "approximateArrivalTimestamp": 1545084650.987 }

当事件流经您的管道时,采用 base64 编码 data 字段将如下所示:

{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" },

Kinesis 事件的元数据属性data 对象之外的任何字段,例如 partitionKeysequenceNumber

Kinesis 事件的数据属性data 对象中的任何字段,例如 CityTemperature

当您进行筛选以匹配此事件时,可以针对解码后的字段使用筛选条件。例如,要筛选 partitionKeyCity,需要使用以下筛选条件:

{ "partitionKey": [ "1" ], "data": { "City": [ "Seattle" ] }

创建事件过滤器时,Pi EventBridge pes 可以访问事件内容。这些内容要么是 JSON 转义的,例如 Amazon SQS body 字段,要么是 base64 编码的,例如 Kinesis data 字段。如果您的数据是有效的 JSON,则您的输入模板或目标参数的 JSON 路径可以直接引用内容。例如,如果 Kinesis 事件源是有效的 JSON,您可以使用 <$.data.someKey> 引用变量。

创建事件模式时,您可以根据源 API 发送的字段进行筛选,而不是根据轮询操作添加的字段进行筛选。以下字段不能用于事件模式:

  • awsRegion

  • eventSource

  • eventSourceARN

  • eventVersion

  • eventID

  • eventName

  • invokeIdentityArn

  • eventSourceKey

消息和数据字段

每个 EventBridge Pipe 源都包含一个包含核心消息或数据的字段。我们将它们称为消息 字段或数据 字段。这些字段之所以特别,是因为它们可能是 JSON 转义或 base64 编码的,但当它们是有效 JSON 时,可以使用 JSON 模式对其进行筛选,就像正文没有被转义一样。这些字段的内容也可以无缝地用于输入转换器

正确筛选 Amazon SQS 消息

如果 Amazon SQS 消息不符合您的筛选标准,则 EventBridge 会自动将该消息从队列中删除。您无需在 Amazon SQS 中手动删除这些消息。

对于 Amazon SQS,消息 body 可以是任何字符串。但如果您的 FilterCriteria 期望 body 为有效的 JSON 格式,则可能会导致问题。反之亦然,如果传入的消息 body 为有效的 JSON 格式,但您的筛选标准期望 body 为纯字符串,会导致出现意外行为。

要避免此问题,请确保 FilterCriteriabody 的格式与您从队列中收到的消息中的 body 的期望格式一致。在筛选您的邮件之前, EventBridge 会自动评估传入消息的格式body和您的过滤器模式。body如果不匹配,则 EventBridge 丢弃该消息。下表汇总了此评估:

传入消息 body 格式 筛选条件模式 body 格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选条件进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

纯字符串

有效 JSON

EventBridge 丢弃消息。

有效 JSON

纯字符串

EventBridge 丢弃消息。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

如果您未body将其包含在中FilterCriteria,则 EventBridge 跳过此检查。

正确筛选 Kinesis 和 DynamoDB 消息

在筛选标准处理了 Kinesis 或 DynamoDB 记录后,流迭代器会跳过此记录。如果记录不符合筛选标准,您无需从事件源手动删除记录。保留期结束后,Kinesis 和 DynamoDB 会自动删除这些旧记录。如果您希望提前删除记录,请参阅更改数据保留期

要正确筛选流事件源中的事件,数据字段和数据字段的筛选条件都必须为有效的 JSON 格式。(对于 Kinesis,数据字段为 data。对于 DynamoDB,数据字段为 dynamodb。) 如果任一字段都不是有效的 JSON 格式,则 EventBridge 会丢弃该消息或引发异常。下表汇总了具体行为:

传入数据格式(datadynamodb 数据属性中的筛选条件模式格式 导致的操作

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

有效 JSON

非 JSON

EventBridge 在管道或更新时抛出异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

非 JSON

有效 JSON

EventBridge 丢掉了记录。

非 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

非 JSON

非 JSON

EventBridge 在创建或更新管道时会引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。

正确筛选 Amazon Managed Streaming for Apache Kafka、自托管 Apache Kafka 和 Amazon MQ 消息。

对于 Amazon MQ 来源,消息字段为 data。对于 Apache Kafka 源(Amazon MSK自托管 Apache Kafka),有两个消息字段:keyvalue

EventBridge 丢弃与过滤器中包含的所有字段不匹配的邮件。对于 Apache Kafka,成功调用该函数后,为匹配和不匹配的消息 EventBridge 提交偏移量。对于 Amazon MQ,在成功调用该函数后 EventBridge 确认匹配的消息,并在筛选不匹配的消息时确认这些消息。

Apache Kafka 和 Amazon MQ 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。那是因为在应用筛选条件之前,将 Apache Kafka 和 Amazon MQ 字节数组 EventBridge 解码为 UTF-8。如果您的消息使用其他编码(例如 UTF-16 或 ASCII),或者消息格式与格式不匹配,则仅 EventBridge 处理元数据过滤器。FilterCriteria下表汇总了具体行为:

传入消息格式(datakeyvalue 消息属性的筛选条件模式格式 导致的操作

纯字符串

纯字符串

EventBridge 根据您的筛选条件进行筛选。

纯字符串

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

纯字符串

有效 JSON

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

有效 JSON

纯字符串

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

有效 JSON

数据属性中没有筛选条件模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

有效 JSON

有效 JSON

EventBridge 根据您的筛选条件进行筛选。

非 UTF-8 编码字符串

JSON、纯字符串或无模式

EventBridge 根据您的筛选条件进行筛选(仅限于其他元数据属性)。

Lambda ESM 和 Pipes 之间的区别 EventBridge

筛选事件时,Lambda ESM 和 Pip EventBridge es 的操作方式通常相同。主要区别在于 ESM 有效负载中不存在 eventSourceKey 字段。