本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
亚马逊 EventBridge 管道筛选
使用 EventBridge Pipes,您可以过滤给定源的事件并仅处理其中的一部分。这种筛选的工作方式与使用事件模式对 EventBridge 事件总线或 Lambda 事件源映射进行筛选的方式相同。更多有关事件模式的信息,请参阅Amazon EventBridge 事件模式。
筛选条件FilterCriteria
对象是一个由筛选条件列表组成的结构。Filters
每个筛选条件是一个用于定义筛选模式的结构 (Pattern
)。Pattern
是 JSON 筛选条件规则的字符串表示。FilterCriteria
对象与以下示例类似:
{ "Filters": [ {"Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}" } ] }
为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern
的值。
{ "Metadata1": [ pattern1 ], "data": {"Data1": [ pattern2 ]} }
FilterCriteria
对象的主要部分是元数据属性和数据属性。
元数据属性是指事件对象的字段。在示例中,
FilterCriteria.Metadata1
表示一个元数据属性。数据属性是指事件主体的字段。在示例中,
FilterCriteria.Data1
表示一个数据属性。
例如,假设你的 Kinesis 直播包含这样的事件:
{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": {"City": "Seattle", "State": "WA", "Temperature": "46", "Month": "December" }, "approximateArrivalTimestamp": 1545084650.987 }
当事件流经您的管道时,它将如下所示,data
字段采用 base64 编码:
{ "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" },
Kinesis 事件的元数据属性是data
对象以外的任何字段,例如partitionKey
或sequenceNumber
。
Kinesis 事件的数据属性是data
对象内部的字段,例如City
或Temperature
。
当您进行筛选以匹配此事件时,可以在解码后的字段上使用过滤器。例如,要进行筛选partitionKey
,City
您可以使用以下过滤器:
{ "partitionKey": [ "1" ], "data": { "City": [ "Seattle" ] }
当你创建事件过滤器时, EventBridge Pipes 可以访问事件内容。此内容要么采用 JSON 转义(例如 Amazon SQSbody
字段),要么采用 base64 编码,例如 Kinesisdata
字段。如果您的数据是有效的 JSON,则您的输入模板或目标参数的 JSON 路径可以直接引用内容。例如,如果 Kinesis 事件源是有效的 JSON,则可以使用引用变量<$.data.someKey>
。
创建事件模式时,您可以根据源 API 发送的字段进行筛选,而不是根据轮询操作添加的字段进行筛选。以下字段不能在事件模式中使用:
awsRegion
eventSource
eventSourceARN
eventVersion
eventID
eventName
invokeIdentityArn
eventSourceKey
消息和数据字段
每个 EventBridge Pipe 源都包含一个包含核心消息或数据的字段。我们将它们称为消息字段或数据字段。这些字段很特别,因为它们可能采用 JSON 转义或 base64 编码,但是当它们是有效的 JSON 时,可以使用 JSON 模式对其进行过滤,就好像正文没有被转义一样。这些字段的内容也可以在输入变压器中无缝使用。
正确筛选 Amazon SQS 消息
如果 Amazon SQS 消息不符合您的筛选条件, EventBridge 会自动从队列中删除该消息。您无需在 Amazon SQS 中手动删除这些消息。
对于 Amazon SQS,消息 body
可以是任何字符串。但如果您的 FilterCriteria
期望 body
为有效的 JSON 格式,则可能会导致问题。反之亦然 — 如果传入的消息body
为有效的 JSON 格式,但您的筛选条件期望body
为纯字符串,则会导致出现意外行为。
要避免此问题,请确保您的格式与您从队列body
中收到的 in 消息的期望格式FilterCriteria
一致。body
在筛选消息之前, EventBridge 会自动评估传入消息的格式body
以及 body 的格式body
。如果不匹配,将会 EventBridge 丢弃此消息。下表汇总了此评估:
传入消息 body 格式 |
筛选条件模式 body 格式 |
导致的操作 |
---|---|---|
纯字符串 |
纯字符串 |
EventBridge 根据您的筛选条件进行筛选。 |
纯字符串 |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
纯字符串 |
有效 JSON |
EventBridge 将丢弃消息。 |
有效 JSON |
纯字符串 |
EventBridge 将丢弃消息。 |
有效 JSON |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
有效 JSON |
EventBridge 根据您的筛选条件进行筛选。 |
如果您未body
将其包括在内FilterCriteria
,请 EventBridge 跳过此检查。
正确筛选 Kinesis 和 DynamoDB 消息
在筛选条件处理了 Kinesis 或 DynamoDB 记录后,流迭代器会跳过此记录。如果记录不符合筛选条件,您无需从事件源手动删除记录。保留期结束后,Kinesis 和 DynamoDB 会自动删除这些旧记录。如果您希望提前删除记录,请参阅更改数据保留期。
要正确筛选流事件源中的事件,数据字段和数据字段的筛选条件都必须为有效的 JSON 格式。(对于 Kinesis,数据字段为 data
。对于 DynamoDB,数据字段为 dynamodb
。) 如果任一字段不为有效的 JSON 格式, EventBridge 将会丢弃消息或引发异常。下表汇总了具体行为:
传入数据格式(data 或 dynamodb ) |
数据属性中的筛选条件模式格式 | 导致的操作 |
---|---|---|
有效 JSON |
有效 JSON |
EventBridge 根据您的筛选条件进行筛选。 |
有效 JSON |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
非 JSON |
EventBridge 在管道或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。 |
非 JSON |
有效 JSON |
EventBridge 将丢弃记录。 |
非 JSON |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
非 JSON |
非 JSON |
EventBridge 在管道创建或更新时引发异常。数据属性的筛选条件模式必须为有效的 JSON 格式。 |
正确筛选 Amazon Managed Streaming for Apache Kafka 自行管理的 Amazon Managed 和 Amazon MQ 消息。
注意
将筛选条件附加到带有 Amazon MQ 事件源的管道后,筛选规则应用到事件最多可能需要 15 分钟。
对于 Amazon MQ 来源,消息字段为 data
。对于 A mazon MSK 和自行管理的 Amazon MSK,有两个消息字段:key
和value
。
EventBridge 丢弃与筛选条件中包含的所有字段不匹配的消息。对于 Apache Kafka,在成功调用函数后为匹配和不匹配的消息 EventBridge 提交偏移。对于 Amazon MQ,在成功调用函数后 EventBridge 确认匹配的消息,并在筛选不匹配的消息时确认此类消息。
Amazon Managed 和 Amazon MQ 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为在应用筛选条件之前将 Amazon Manag EventBridge ed 和 Amazon MQ 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与格式不匹配,则仅 EventBridge 处理元数据筛选条件。FilterCriteria
下表汇总了具体行为:
传入消息格式(data 或 key 和 value ) |
消息属性的筛选条件模式格式 | 导致的操作 |
---|---|---|
纯字符串 |
纯字符串 |
EventBridge 根据您的筛选条件进行筛选。 |
纯字符串 |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
纯字符串 |
有效 JSON |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
纯字符串 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
数据属性中没有筛选条件模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
有效 JSON |
EventBridge 根据您的筛选条件进行筛选。 |
非 UTF-8 编码字符串 |
JSON、纯字符串或无模式 |
EventBridge 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
Lambda ESM 和 EventBridge 管道之间的区别
筛选事件时,Lambda ESM 和 Pip EventBridge es 的运行方式通常相同。主要区别在于 ESM 有效载荷中不存在磁eventSourceKey
场。