

# 捕获 Amazon MSK 和自托管式 Apache Kafka 事件源的丢弃批次
<a name="kafka-on-failure"></a>

要保留失败的事件源映射调用的记录，请在函数的事件源映射中添加一个目标。发送到目标的每条记录都是一个 JSON 文档，其中包含有关失败调用的元数据。对于 Amazon S3 目标，Lambda 还会发送整个调用记录以及元数据。您可以将任何 Amazon SNS 主题、Amazon SQS 队列、Amazon S3 存储桶或 Kafka 配置为目标。

借助 Amazon S3 目标，您可以使用 [Amazon S3 事件通知](https://docs.amazonaws.cn/)功能在对象上传到目标 S3 存储桶时接收通知。您还可以将 S3 事件通知配置为调用另一个 Lambda 函数来对失败的批次执行自动处理。

您的执行角色必须具有目标的权限：
+ **对于 SQS 目标：**[sqs:SendMessage](https://docs.amazonaws.cn/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)
+ **对于 SNS 目标：**[sns:Publish](https://docs.amazonaws.cn/sns/latest/api/API_Publish.html)
+ **对于 S3 目标：**[ s3:PutObject](https://docs.amazonaws.cn/AmazonS3/latest/API/API_PutObject.html) 和 [s3:ListBucket](https://docs.amazonaws.cn/AmazonS3/latest/API/ListObjectsV2.html)
+ **对于 Kafka 目标：**[kafka-cluster:WriteData](https://docs.amazonaws.cn/msk/latest/developerguide/kafka-actions.html)

您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录，或者当记录的保存时间超过最大期限时，Lambda 会将这些失败的记录发送至指定的 Kafka 主题，以便后续进行处理。请参考[使用 Kafka 主题作为失败时的目标](kafka-on-failure-destination.md)。

您必须在 Kafka 集群 VPC 中为故障目标服务部署 VPC 端点。

此外，如果您在目标上配置了 KMS 密钥，则根据具体目标类型，Lambda 需要以下权限：
+ 如果您已使用自己的 KMS 密钥为 S3 目标启用加密，则需要 [kms:GenerateDataKey](https://docs.amazonaws.cn/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中，请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。
+ 如果您已使用自己的 KMS 密钥为 SQS 目标启用加密，则需要 [kms:Decrypt](https://docs.amazonaws.cn/kms/latest/APIReference/API_Decrypt.html) 和 [kms:GenerateDataKey](https://docs.amazonaws.cn/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中，请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、[kms:DescribeKey](https://docs.amazonaws.cn/kms/latest/APIReference/API_DescribeKey.html) 和 [kms:ReEncrypt](https://docs.amazonaws.cn/kms/latest/APIReference/API_ReEncrypt.html)。
+ 如果您已使用自己的 KMS 密钥为 SNS 目标启用加密，则需要 [kms:Decrypt](https://docs.amazonaws.cn/kms/latest/APIReference/API_Decrypt.html) 和 [kms:GenerateDataKey](https://docs.amazonaws.cn/kms/latest/APIReference/API_GenerateDataKey.html)。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中，请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、[kms:DescribeKey](https://docs.amazonaws.cn/kms/latest/APIReference/API_DescribeKey.html) 和 [kms:ReEncrypt](https://docs.amazonaws.cn/kms/latest/APIReference/API_ReEncrypt.html)。

## 为 Kafka 事件源映射配置失败时的目标
<a name="kafka-onfailure-destination"></a>

要使用控制台配置失败时的目标，请执行以下步骤：

1. 打开 Lamba 控制台的[函数](https://console.amazonaws.cn/lambda/home#/functions)页面。

1. 选择函数。

1. 在 **Function overview (函数概览)** 下，选择 **Add destination (添加目标)**。

1. 对于**源**，请选择**事件源映射调用**。

1. 对于**事件源映射**，请选择为此函数配置的事件源。

1. 在**条件**中，选择**失败时**。对于事件源映射调用，这是唯一可接受的条件。

1. 对于**目标类型**，请选择 Lambda 要发送调用记录的目标类型。

1. 对于 **Destination (目标)**，请选择一个资源。

1. 选择**保存**。

您还可以使用 Amazon CLI 配置失败时的目标。例如，以 [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) 命令将带有 SQS 失败时目标的事件源映射添加到 `MyFunction`：

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

以下 [update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html) 命令将 S3 失败时目标添加到与输入 `uuid` 关联的事件源：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
```

要移除目标，请提供一个空字符串作为 `destination-config` 参数的实际参数：

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Amazon S3 目标的安全最佳实践
<a name="kafka-s3-destination-security"></a>

如果删除配置为目标的 S3 存储桶而不将目标从函数配置中删除，则可能会造成安全风险。如果其他用户知道您的目标存储桶的名称，则他们可以在其 Amazon Web Services 账户中重新创建存储桶。调用失败的记录将发送到存储桶，这可能会暴露您函数中的数据。

**警告**  
为确保您的函数中的调用记录不会发送到另一个 Amazon Web Services 账户中的 S3 存储桶，请向函数的执行角色添加条件，以将 `s3:PutObject` 权限限制为您账户中的存储桶。

以下示例显示了一个 IAM 策略，该策略将您函数的 `s3:PutObject` 权限限制为您账户中的存储桶。该策略还为 Lambda 提供了使用 S3 存储桶作为目标所需的 `s3:ListBucket` 权限。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

要使用 Amazon Web Services 管理控制台 或 Amazon CLI 向函数的执行角色添加权限策略，请参阅以下程序中的说明：

------
#### [ Console ]

**向函数的执行角色添加权限策略（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.amazonaws.cn/lambda/home#/functions)页面。

1. 选择想要修改其执行角色的 Lambda 函数。

1. 在**配置**选项卡中，选择**权限**。

1. 在**执行角色**选项卡中，选择您的函数的**角色名称**,以打开该角色的 IAM 控制台页面。

1. 通过执行以下操作，向角色添加权限策略：

   1. 在**权限策略**窗格中，选择**添加权限**，然后选择**创建内联策略**。

   1. 在**策略编辑器**中，选择 **JSON**。

   1. 将要添加的策略粘贴到编辑器中（替换现有 JSON），然后选择**下一步**。

   1. 在**策略详细信息**下，输入**策略名称**。

   1. 选择**创建策略**。

------
#### [ Amazon CLI ]

**向函数的执行角色添加权限策略（CLI）**

1. 创建具有所需权限的 JSON 策略文档并将其保存在本地目录中。

1. 使用 IAM `put-role-policy` CLI 命令向您函数的执行角色添加权限。在您保存 JSON 策略文档的目录中运行以下命令，并用您自己的值替换角色名称、策略名称和策略文档。

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### SNS 和 SQS 示例调用记录
<a name="kafka-sns-sqs-destinations"></a>

以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。`recordsInfo` 下面的每个密钥都包含 Kafka 主题和分区，用连字符分隔。例如，对于密钥 `"Topic-0"`，`Topic` 是 Kafka 主题，`0` 是分区。对于每个主题和分区，可以使用偏移量和时间戳数据来查找原始调用记录。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    }
}
```

### S3 目标示例调用记录
<a name="kafka-s3-destinations"></a>

对于 S3 目标，Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外，`payload` 字段还包含作为转义 JSON 字符串的原始调用记录。

```
{
    "requestContext": {
        "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
        "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
        "approximateInvokeCount": 1
    },
    "responseContext": { // null if record is MaximumPayloadSizeExceeded
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KafkaBatchInfo": {
        "batchSize": 500,
        "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
        "bootstrapServers": "...",
        "payloadSize": 2039086, // In bytes
        "recordsInfo": {
            "Topic-0": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            },
            "Topic-1": {
                "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
                "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
                "firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
                "lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
            }
        }
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

**提示**  
我们建议在目标存储桶上启用 S3 版本控制。