Filtering events from Amazon MSK and self-managed Apache Kafka event sources
You can use event filtering to control which records from a stream or queue Lambda sends to your function. For general information about how event filtering works, see Control which events Lambda sends to your function.
Note
Amazon MSK and self-managed Apache Kafka event source mappings only support filtering on the value key.
Kafka event filtering basics
Suppose a producer is writing messages to a topic in your Kafka cluster, either in valid JSON format or as plain strings. An example record
would look like the following, with the message converted to a Base64 encoded string in the value field.
{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }
Suppose your Apache Kafka producer is writing messages to your topic in the following JSON format.
{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }
You can use the value key to filter records. Suppose you wanted to filter only those records where device_ID
begins with the letters AB. The FilterCriteria object would be as follows.
{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }
For added clarity, here is the value of the filter's Pattern expanded in plain JSON.
{ "value": { "device_ID": [ { "prefix": "AB" } ] } }
You can add your filter using the console, Amazon CLI or an Amazon SAM template.
With Kafka, you can also filter records where the message is a plain string. Suppose you want to ignore those messages where the string is
"error". The FilterCriteria object would look as follows.
{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }
For added clarity, here is the value of the filter's Pattern expanded in plain JSON.
{ "value": [ { "anything-but": [ "error" ] } ] }
You can add your filter using the console, Amazon CLI or an Amazon SAM template.
Kafka messages must be UTF-8 encoded strings, either plain strings or in JSON format. That's because Lambda decodes Kafka byte arrays into UTF-8 before
applying filter criteria. If your messages use another encoding, such as UTF-16 or ASCII, or if the message format doesn't match the
FilterCriteria format, Lambda processes metadata filters only. The following table summarizes the specific behavior:
| Incoming message format | Filter pattern format for message properties | Resulting action |
|---|---|---|
|
Plain string |
Plain string |
Lambda filters based on your filter criteria. |
|
Plain string |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
|
Plain string |
Valid JSON |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
|
Valid JSON |
Plain string |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
|
Valid JSON |
No filter pattern for data properties |
Lambda filters (on the other metadata properties only) based on your filter criteria. |
|
Valid JSON |
Valid JSON |
Lambda filters based on your filter criteria. |
|
Non-UTF-8 encoded string |
JSON, plain string, or no pattern |
Lambda filters (on the other metadata properties only) based on your filter criteria. |