对自行管理的 Apache Kafka 事件源使用事件筛选
您可以使用事件筛选,控制 Lambda 将流或队列中的哪些记录发送给函数。有关事件筛选工作原理的一般信息,请参阅 控制 Lambda 向您的函数发送的事件。
本节重点介绍自行管理的 Apache Kafka 事件源的事件筛选。
注意
自托管式 Apache Kafka 事件源映射仅支持对 value
键进行筛选。
自行管理的 Apache Kafka 事件筛选基础知识
假设创建者以有效的 JSON 格式或纯字符串的形式将消息写入自行管理的 Apache Kafka 集群中的主题。示例记录将如下所示,value
字段中的消息会转换为 Base64 编码字符串。
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
假设 Apache Kafka 创建器以如下 JSON 格式将消息写入主题。
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
您可以使用 value
键筛选记录。假设您只想筛选 device_ID
以字母 AB 开头的记录。FilterCriteria
对象将如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern
的值。
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
您可以使用控制台、Amazon CLI 或 Amazon SAM 模板添加筛选条件。
通过自行管理的 Apache Kafka,您还可以筛选消息为纯字符串的记录。假设您想忽略字符串为“错误”的消息。FilterCriteria
对象将如下所示。
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
为了更清楚起见,以下是在纯 JSON 中展开的筛选条件 Pattern
的值。
{ "value": [ { "anything-but": [ "error" ] } ] }
您可以使用控制台、Amazon CLI 或 Amazon SAM 模板添加筛选条件。
自行管理的 Apache Kafka 消息必须是 UTF-8 编码的字符串,可以是纯字符串或 JSON 格式。这是因为 Lambda 在应用筛选条件之前将 Kafka 字节数组解码为 UTF-8。如果您的消息使用另一种编码,例如 UTF-16 或 ASCII,或者消息格式与 FilterCriteria
格式不匹配,则 Lambda 仅处理元数据筛选条件。下表汇总了具体行为:
传入消息格式 | 消息属性的筛选条件模式格式 | 导致的操作 |
---|---|---|
纯字符串 |
纯字符串 |
Lambda 根据您的筛选条件进行筛选。 |
纯字符串 |
数据属性中没有筛选条件模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
纯字符串 |
有效 JSON |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
纯字符串 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
数据属性中没有筛选条件模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |
有效 JSON |
有效 JSON |
Lambda 根据您的筛选条件进行筛选。 |
非 UTF-8 编码字符串 |
JSON、纯字符串或无模式 |
Lambda 根据您的筛选条件进行筛选(仅限其他元数据属性)。 |