Capturing discarded batches for a self-managed Apache Kafka event source
To retain records of failed event source mapping invocations, add a destination to your function's event source mapping. Each record sent to the destination is a JSON document with metadata about the failed invocation. You can configure any Amazon SNS topic, Amazon SQS queue, or S3 bucket as a destination. Your execution role must have permissions for the destination:
-
For SQS destinations: sqs:SendMessage
-
For SNS destinations: sns:Publish
-
For S3 bucket destinations: s3:PutObject and s3:ListBuckets
You must deploy a VPC endpoint for your on-failure destination service inside your Apache Kafka cluster VPC.
Additionally, if you configured a KMS key on your destination, Lambda needs the following permissions depending on the destination type:
-
If you've enabled encryption with your own KMS key for an S3 destination, kms:GenerateDataKey is required. If the KMS key and S3 bucket destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:GenerateDataKey.
-
If you've enabled encryption with your own KMS key for SQS destination, kms:Decrypt and kms:GenerateDataKey are required. If the KMS key and SQS queue destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey, and kms:ReEncrypt.
-
If you've enabled encryption with your own KMS key for SNS destination, kms:Decrypt and kms:GenerateDataKey are required. If the KMS key and SNS topic destination are in a different account from your Lambda function and execution role, configure the KMS key to trust the execution role to allow kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey, and kms:ReEncrypt.
Configuring on-failure destinations for an self-managed Apache Kafka event source mapping
To configure an on-failure destination using the console, follow these steps:
Open the Functions page
of the Lambda console. -
Choose a function.
-
Under Function overview, choose Add destination.
-
For Source, choose Event source mapping invocation.
-
For Event source mapping, choose an event source that's configured for this function.
-
For Condition, select On failure. For event source mapping invocations, this is the only accepted condition.
-
For Destination type, choose the destination type that Lambda sends invocation records to.
-
For Destination, choose a resource.
-
Choose Save.
You can also configure an on-failure destination using the Amazon CLI. For example, the following
create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws-cn:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws-cn:sqs:us-east-1:123456789012:dest-queue"}}'
The following update-event-source-mappinguuid
:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws-cn:s3:::dest-bucket"}}'
To remove a destination, supply an empty string as the argument to the
destination-config
parameter:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
SNS and SQS example invocation record
The following example shows what Lambda sends to an SNS topic or SQS queue destination for a
failed Kafka event source invocation. Each of the keys under recordsInfo
contains
both the Kafka topic and partition, separated by a hyphen. For example, for the key
"Topic-0"
, Topic
is the Kafka topic, and 0
is the
partition. For each topic and partition, you can use the offsets and timestamp data to find
the original invocation records.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws-cn:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws-cn:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }
S3 destination example invocation record
For S3 destinations, Lambda sends the entire invocation record along with the metadata
to the destination. The following example shows that Lambda sends to an S3 bucket destination
for a failed Kafka event source invocation. In addition to all of the fields from the previous
example for SQS and SNS destinations, the payload
field contains the original
invocation record as an escaped JSON string.
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws-cn:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws-cn:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tip
We recommend enabling S3 versioning on your destination bucket.