使用 Kafka 主题作为失败时的目标 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

使用 Kafka 主题作为失败时的目标

您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录,或者当记录的保存时间超过最大期限时,Lambda 会将这些失败的记录发送至指定的 Kafka 主题,以便后续进行处理。

Kafka 失败时的目标的工作原理

当您将 Kafka 主题配置为失败时的目标时,Lambda 将充当 Kafka 生产者,并将失败的记录写入目标主题。这会在您的 Kafka 基础设施中创建死信主题(DLT)模式。

  • 相同的集群要求:目标主题必须与源主题位于同一个 Kafka 集群中。

  • 实际记录内容:Kafka 目标会收到实际的失败记录以及故障元数据。

  • 递归防护:Lambda 通过阻止源主题和目标主题相同的配置来避免无限循环。

配置 Kafka 失败时的目标

在创建或更新 Kafka 事件源映射时,您可以将 Kafka 主题配置为失败时的目标。

配置 Kafka 目标(控制台)

要将 Kafka 主题配置为失败时的目标(控制台)
  1. 打开 Lamba 控制台的函数页面。

  2. 选择您的函数名称。

  3. 请执行以下操作之一:

    • 要添加新的 Kafka 触发器,请在函数概述下选择添加触发器

    • 要修改现有的 Kafka 触发器,请选择该触发器,然后选择编辑

  4. 其他设置下,对于失败时的目标,选择 Kafka 主题

  5. 对于主题名称,输入要将失败记录发送至的 Kafka 主题的名称。

  6. 选择添加保存

配置 Kafka 目标(Amazon CLI)

使用 kafka:// 前缀将 Kafka 主题指定为失败时的目标。

使用 Kafka 目标创建事件源映射

以下示例将 Kafka 主题作为失败时的目标创建了一个 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

对于自行管理的 Kafka,请使用相同的语法:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

更新 Kafka 目标

使用 update-event-source-mapping 命令添加或修改 Kafka 目标:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 目标的记录格式

当 Lambda 向 Kafka 主题发送失败的记录时,每条消息都包含有关失败的元数据和实际的记录内容。

失败元数据

元数据包含记录失败的原因以及原始批次的详细信息:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

分区键行为

Lambda 在将数据发送至目标主题时,会使用原始记录中的相同分区键。如果原始记录中没有键,则 Lambda 会使用 Kafka 的默认轮询分区机制,在目标主题的所有可用分区中进行分区。

要求和限制

  • 需要预置模式:Kafka 失败时的目标仅适用于启用了预置模式的事件源映射。

  • 仅限同一集群:目标主题必须与源主题位于同一个 Kafka 集群中。

  • 主题权限:您的事件源映射必须对目标主题具有写入权限。示例:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 无递归:目标主题名称不能与您的任何源主题名称相同。