将 Lambda 与自行管理的 Apache Kafka 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Lambda 与自行管理的 Apache Kafka 结合使用

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

Lambda 支持将 Apache Kafka 作为事件源。Apache Kafka 是一个开源事件流平台,支持数据管道和流分析等工作负载。

您可以使用Amazon托管的 Kafka 服务 Amazon Managed Streaming for Apache Kafka(Amazon MSK),简称自行管理的 Kafka 集群。有关结合 Amazon MSK 使用 Lambda 的详细信息,请查阅 结合 Amazon MSK 使用 Lambda

本主题介绍了如何将 Lambda 与自行管理的 Kafka 集群结合使用。在Amazon术语中,自行管理的群集包括非Amazon托管 Kafka 集群。例如,可以通过 Confluent Cloud 等云提供程序来托管 Kafka 集群。

Apache Kafka 作为事件源,运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon Kinesis 相似。Lambda 在内部轮询来自事件源的新消息,然后同步调用目标 Lambda 函数。Lambda 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)

对于基于 Kafka 的事件源,Lambda 支持处理控制参数,例如批处理时段和批处理大小。有关更多信息,请参阅批处理行为

有关如何使用自行管理的 Kafka 作为事件源的示例,请参阅Amazon计算博客上的使用自行托管的 Apache Kafka 作为 Amazon Lambda 事件源

示例事件

当 Lambda 调用 Lambda 函数时,它会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Kafka 主题和 Kafka 分区标识符的详细信息,以及时间戳和 base64 编码的消息。

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Kafka 集群身份验证

Lambda 支持多种方法来使用自行管理的 Apache Kafka 集群进行身份验证。请确保将 Kafka 集群配置为使用支持的下列身份验证方法之一:有关 Kafka 安全的更多信息,请参阅 Kafka 文档的安全部分。

VPC 访问

如果仅 VPC 中的 Kafka 用户访问 Kafka 代理,则必须为 Amazon Virtual Private Cloud (Amazon VPC) 访问配置 Kafka 事件源。

SASL/SCRAM 身份验证

Lambda 支持使用传输层安全性协议(TLS)加密 (SASL_SSL) 进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。Lambda 发送已加密凭据以使用集群进行身份验证。Lambda 不支持使用明文 (SASL_PLAINTEXT) 的 SASL/SCRAM。有关 SASL/SCRAM 身份验证的更多信息,请参阅 RFC 5802

Lambda 还支持 SASL/PLAIN 身份验证。由于此机制使用明文凭证,因此与服务器的连接必须使用 TLS 加密以确保凭证受到保护。

为了 SASL 身份验证,需要将登录凭证作为密钥存储在 Amazon Secrets Manager 中。有关使用 Secrets Manager 的更多信息,请参阅 Amazon Secrets Manager 用户指南中的教程:创建和检索密钥

重要

要使用 Secrets Manager 进行身份验证,密钥必须存储在 Lambda 函数所在的同一 Amazon 区域中。

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

在自行托管的 Apache Kafka 中,Lambda 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以使用 Kafka 代理对 Lambda 进行身份验证。客户端证书必须由服务器信任存储中的 CA 签名。

Kafka 集群向 Lambda 发送服务器证书,以便使用 Lambda 对 Kafka 代理进行身份验证。服务器证书可以是公有 CA 证书。也可以是私有 CA/自签名证书。公有 CA 证书必须由 Lambda 信任存储中的证书颁发机构(CA)签名。对于私有 CA /自签名证书,您可以配置服务器根 CA 证书(作为 Secrets Manager 中的密钥)。Lambda 使用根证书来验证 Kafka 代理。

有关 mTLS 的更多信息,请参阅为作为事件源的 Amazon MSK 引入双向 TLS 身份验证

配置客户端证书密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

Lambda 支持 PBES1(而不是 PBES2)私有密钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 mTLS 身份验证的密钥内容。对于加密的私有密钥,可以在密钥中包含私有密钥密码。

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

配置服务器根 CA 证书密钥

如果您的 Kafka 代理使用 TLS 加密(具有由私有 CA 签名的证书),则创建此密钥。您可以将 TLS 加密用于 VPC、SASL/SCRAM、SASL/PLAIN 或 mTLS 身份验证。

服务器根 CA 证书密钥需要一个字段,其中包含 PEM 格式的 Kafka 代理的根 CA 证书。以下示例显示密钥的结构。

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

管理 API 访问和权限

除了访问自行托管的 Kafka 集群外,您的 Lambda 函数还需要执行各种 API 操作的权限。您可以为函数的执行角色添加这些权限。如果您的用户需要访问任何 API 操作,请将所需权限添加到 Amazon Identity and Access Management(IAM)用户或角色的身份策略中。

所需的 Lambda 函数权限

要在 Amazon CloudWatch Logs 中创建日志并将日志存储到日志组,Lambda 函数必须在它的执行角色中具有以下权限:

可选的 Lambda 函数权限

您的 Lambda 函数还可能需要权限来:

  • 描述您的 Secrets Manager 密钥。

  • 访问 Amazon Key Management Service(Amazon KMS)客户管理的密钥。

  • 访问 Amazon VPC。

  • 将失败调用的记录发送到目标。

Secrets Manager 和 Amazon KMS 权限

根据您为 Kafka 代理配置的访问控制类型,Lambda 函数可能需要访问您的 Secrets Manager 密钥或解密 Amazon KMS 客户管理的密钥的权限。要连接到这些资源,函数的执行角色必须具有以下权限:

VPC 权限

如果只有 VPC 内的用户才能访问您自行托管的 Apache Kafka 集群,则 Lambda 函数必须具有访问 Amazon VPC 资源的权限。这些资源包括您的 VPC、子网、安全组和网络接口。要连接到这些资源,函数的执行角色必须具有以下权限:

将记录发送到目标

如果您想将失败调用的记录发送到故障目标,则您的 Lambda 函数必须具有发送这些记录的权限。对于 Kafka 事件源映射,您可以选择 Amazon SNS 主题、Amazon SQS 队列或 Amazon S3 存储桶作为目标。要将记录发送到 SNS 主题,您的函数的执行角色必须具有以下权限:

要将记录发送到 SQS 队列,您的函数的执行角色必须具有以下权限:

要将记录发送到 S3 存储桶,您的函数的执行角色必须具有以下权限:

此外,如果您在目标上配置了 KMS 密钥,则根据具体目标类型,Lambda 需要以下权限:

  • 如果您已使用自己的 KMS 密钥为 S3 目标启用加密,则需要 kms:GenerateDataKey。如果 KMS 密钥和 S3 存储桶目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:GenerateDataKey。

  • 如果您已使用自己的 KMS 密钥为 SQS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SQS 队列目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

  • 如果您已使用自己的 KMS 密钥为 SNS 目标启用加密,则需要 kms:Decryptkms:GenerateDataKey。如果 KMS 密钥和 SNS 主题目标与您的 Lambda 函数和执行角色位于不同的账户中,请将 KMS 密钥配置为信任执行角色以允许 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKeykms:ReEncrypt

向执行角色添加权限

要访问自行管理的 Apache Kafka 集群使用的其他Amazon服务,Lambda 需使用您在 Lambda 函数执行角色中定义的权限策略。

默认情况下,Lambda 无权为自行管理的 Apache Kafka 集群执行必需或可选操作。您必须在 IAM 信任策略中创建和定义这些操作,然后将策略附加到执行角色。此示例演示了如何创建允许 Lambda 访问您的 Amazon VPC 资源的策略。

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

有关在 IAM 控制台上创建 JSON 策略文档的信息,请参阅 IAM 用户指南中的在 JSON 选项卡上创建策略

使用 IAM policy 授予用户访问权限

默认情况下,用户和角色无权执行事件源 API 操作。要向组织或账户中的用户授予访问权限,您可以创建或更新基于身份的策略。有关更多信息,请参阅 IAM 用户指南中的使用策略控制对Amazon资源的访问权限

身份验证和授权错误

如果缺少使用来自 Kafka 集群的数据所需的任何权限,Lambda 会在 LastProcessingResult 下的事件源映射中显示以下错误消息。

集群未能授权 Lambda

对于 SASL/SCRAM 或 mTLS,此错误表明提供的用户不具有以下所有必需的 Kafka 访问控制列表(ACL)权限:

  • DescribeConfigs 集群

  • 描述组

  • 读取组

  • 描述主题

  • 读取主题

当您使用所需的 kafka-cluster 权限创建 Kafka ACL 时,请将主题和组指定为资源。主题名称必须与事件源映射中的主题一致。组名称必须与事件源映射的 UUID 一致。

向执行角色添加所需的权限后,更改可能需要几分钟才会生效。

SASL 身份验证失败

对于 SASL/SCRAM 或 SASL/PLAIN,此错误表明提供的登录凭证无效。

服务器未能通过 Lambda 的身份验证

此错误表明 Kafka 代理未能对 Lambda 进行身份验证。出现此错误的可能原因如下:

  • 您没有为 mTLS 身份验证提供客户端证书。

  • 您提供了客户端证书,但未将 Kafka 代理配置为使用 mTLS 身份验证。

  • Kafka 代理不信任客户端证书。

Lambda 未能对服务器进行身份验证

此错误表明 Lambda 未能对 Kafka 代理进行身份验证。出现此错误的可能原因如下:

  • Kafka 代理使用自签名证书或私有 CA,但未提供服务器根 CA 证书。

  • 服务器根 CA 证书与签署代理证书的根 CA 不匹配。

  • 主机名验证失败,因为代理的证书未将该代理的 DNS 名称或 IP 地址用作主题替代名称。

提供的证书或私有密钥无效

此错误表明 Kafka 使用者无法使用提供的证书或私有密钥。确保证书和密钥使用 PEM 格式,并且私有密钥加密使用 PBES1 算法。

网络配置

如果您配置了对 Kafka 代理的 Amazon VPC 访问权限,则 Lambda 必须有权访问 Kafka 集群所在的 Amazon VPC 资源。建议您为 Lambda 和 Amazon Security Token Service(Amazon STS)部署 Amazon PrivateLink VPC 端点。如果代理使用身份验证,则还需要为 Secrets Manager 部署 VPC 端点。如果您配置了故障目标,则还要为该目标服务部署 VPC 端点。

或者,请确保与 Kafka 集群关联的 VPC 在每个公有子网中包含一个 NAT 网关。有关更多信息,请参阅VPC 连接函数的 Internet 和服务访问

如果您使用 VPC 端点,则还必须将其配置为启用私有 DNS 名称

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则 – 允许 Kafka 代理端口上为事件源指定的安全组的所有流量。预设情况下,Kafka 使用端口 9092。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许 Kafka 代理端口上为事件源指定的安全组的所有流量。预设情况下,Kafka 使用端口 9092。

  • 如果您使用的是 VPC 端点而不是 NAT 网关,则与 VPC 端点关联的安全组必须允许来自事件源安全组的端口 443 上的所有入站流量。

有关配置网络的更多信息,请参阅Amazon计算博客上的使用 Apache Kafka 集群在 VPC 中设置 Amazon Lambda

将 Kafka 集群添加为事件源

创建事件源映射,使用 Lambda 控制台、Amazon开发工具包,或 Amazon Command Line Interface (Amazon CLI) 将您的 Kafka 集群添加为 Lambda 函数触发器

本节介绍了如何使用 Lambda 控制台和 Amazon CLI 创建事件源映射。

注意

在更新、禁用或删除自行管理的 Apache Kafka 事件源映射后,更改最长可能需要 15 分钟才会生效。在此期间结束之前,您的事件源映射可能会继续处理事件并使用您之前的设置来调用您的函数。即使控制台中显示的事件源映射状态表明更改已经生效,也可能发生这种情况。

先决条件

  • 自行管理的 Apache Kafka 集群。Lambda 支持 Apache Kafka 版本 0.10.1.0 及更高版本。

  • 一个有权访问您自行管理的 Kafka 集群所用 Amazon 资源的执行角色

可自定义的使用者组 ID

将 Kafka 设置为事件源时,您可以指定使用者组 ID。此使用者组 ID 是您希望 Lambda 函数加入的 Kafka 使用者组的现有标识符。您可以使用此功能将任何正在进行的 Kafka 记录处理设置从其他使用者无缝迁移到 Lambda。

如果指定了使用者组 ID,并且该使用者组中还有其他活跃的轮询器,则 Kafka 会向所有使用者分发消息。换句话说,Lambda 不会收到 Kafka 主题的所有消息。如果希望 Lambda 处理主题中的所有消息,请关闭该使用者组中的任何其他轮询器。

此外,如果指定了使用者组 ID,而 Kafka 找到了具有相同 ID 的有效现有使用者组,则 Lambda 会忽略事件源映射的 StartingPosition 参数。相反,Lambda 开始根据使用者组的已提交偏移量处理记录。如果指定了使用者组 ID,而 Kafka 找不到现有使用者组,则 Lambda 会使用指定的 StartingPosition 配置事件源。

在所有 Kafka 事件源中,您指定的使用者组 ID 必须是唯一的。在使用指定的使用者组 ID 创建 Kafka 事件源映射后,无法更新此值。

失败时的目标

要保留来自 Kafka 事件源的调用失败或负载过大的记录,请为函数配置失败时的目标。当调用失败时,Lambda 会向目标发送包含调用详细信息的 JSON 记录。

您可以选择 Amazon SNS 主题、Amazon SQS 队列或 Amazon S3 存储桶作为您的目标。对于 SNS 主题或 SQS 队列目标,Lambda 会将记录元数据发送到目标。对于 S3 存储桶目标,Lambda 会将整个调用记录以及元数据发送到目标。

要让 Lambda 成功将记录发送到您选择的目标,请确保函数的执行角色包含相关权限。该表还描述了每种目标类型如何接收 JSON 调用记录。

目标类型 支持以下事件源 所需的权限 特定于目标的 JSON 格式

Amazon SQS 队列

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon SNS 主题

  • Kinesis

  • DynamoDB

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录元数据作为 Message 传递到目标。

Amazon S3 存储桶

  • 自行管理的 Apache Kafka 和托管的 Apache Kafka

Lambda 将调用记录及其元数据存储在目标。

提示

作为最佳实践,请仅在执行角色中包含所需的最小权限。

SNS 和 SQS 目标

以下示例显示了 Lambda 在 Kafka 事件源调用失败时向 SNS 主题或 SQS 队列目标发送的内容。recordsInfo 下面的每个密钥都包含 Kafka 主题和分区,用连字符分隔。例如,对于密钥 "Topic-0"Topic 是 Kafka 主题,0 是分区。对于每个主题和分区,可以使用偏移量和时间戳数据来查找原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3 目标

对于 S3 目标,Lambda 会将整个调用记录以及元数据发送到目标。以下示例显示了 Lambda 因调用 Kafka 事件源失败而向 S3 存储桶目标发送消息。除了针对 SQS 和 SNS 目标的上一示例中的所有字段外,payload 字段还包含作为转义 JSON 字符串的原始调用记录。

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
提示

我们建议在目标存储桶上启用 S3 版本控制。

配置失败时的目标

要使用控制台配置失败时的目标,请执行以下步骤:

  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add destination (添加目标)

  4. 对于,请选择事件源映射调用

  5. 对于事件源映射,请选择为此函数配置的事件源。

  6. 条件中,选择失败时。对于事件源映射调用,这是唯一可接受的条件。

  7. 对于目标类型,请选择 Lambda 要发送调用记录的目标类型。

  8. 对于 Destination (目标),请选择一个资源。

  9. 选择保存

您还可以使用 Lambda API 配置失败时的目标。例如,以下 CreateEventSourceMapping CLI 命令将为 MyFunction 添加 SQS 失败时目标:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

以下 UpdateEventSourceMapping CLI 命令将 S3 失败时目标添加到与输入 uuid 关联的 Kafka 事件源:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

要移除目标,请提供一个空字符串作为 destination-config 参数的实际参数:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

添加自行管理的 Kafka 集群(控制台)

按照以下步骤将自行管理的 Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Apache Kafka 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 Apache Kafka 触发类型。

    2. 对于 Bootstrap servers(Bootstrap 引导服务器),输入集群中 Kafka 代理的主机和端口对地址,然后选择 Add(添加)。对集群中的每个 Kafka 代理重复此操作。

    3. 对于 Topic name(主题名称),输入用于在集群中存储记录的 Kafka 主题的名称。

    4. (可选)对于 Batch size(批处理大小),输入要在单个批次中接收的最大记录数。

    5. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 VPC,请为您的 Kafka 集群选择 Amazon VPC。然后,选择 VPC subnets(VPC 子网)和 VPC security groups(VPC 安全组)。

      如果仅 VPC 内部的用户访问代理,则需要此设置。

    9. (可选)对于 Authentication(身份验证),请选择 Add(添加),然后执行以下操作:

      1. 选择集群中 Kafka 代理的访问权限或身份验证协议。

        • 如果 Kafka 代理使用 SASL/PLAIN 身份验证,请选择 BASIC_AUTH

        • 如果代理使用 SASL/SCRAM 身份验证,请选择其中一个 SASL_SCRAM 协议。

        • 如果要配置 mTLS 身份验证,请选择 CLIENT_CERTIFICATE_TLS_AUTH 协议。

      2. 对于 SASL/SCRAM 或 mTLS 身份验证,请选择包含 Kafka 集群凭据的 Secrets Manager 私有密钥。

    10. (可选)对于 Encryption(加密),如果您的 Kafka 代理使用由私有 CA 签名的证书,请选择包含根 CA 证书的 Secrets Manager 密钥,Kafka 代理使用该证进行 TLS 加密。

      此设置适用于 SASL/SCRAM 或 SASL/PLAIN 的 TLS 加密,以及 mTLS 身份验证。

    11. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加自行管理的 Kafka 集群 (Amazon CLI)

使用以下示例 Amazon CLI 命令为 Lambda 函数创建和查看自行管理的 Apache Kafka 触发器。

使用 SASL/SCRAM

如果 Kafka 用户通过互联网访问您的 Kafka 代理,则需要指定为 SASL/SCRAM 身份验证创建的 Secrets Manager。以下示例使用 create-event-source-mapping Amazon CLI 命令将一个名为 my-kafka-function 的 Lambda 函数映射至一个名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 VPC

如果仅 VPC 中的 Kafka 用户访问 Kafka 代理,则必须指定 VPC、子网和 VPC 安全组。以下示例使用 create-event-source-mapping Amazon CLI 命令将一个名为 my-kafka-function 的 Lambda 函数映射至一个名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 Amazon CLI 查看状态

以下示例使用 get-event-source-mapping Amazon CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

将 Kafka 集群用作事件源

将 Apache Kafka 集群添加为 Lambda 函数的触发器时,集群将用作事件源

Lambda 根据您指定的 StartingPosition,从您在 CreateEventSourceMapping 请求中指定为 Topics 的 Kafka 主题读取事件数据。成功进行处理后,会将 Kafka 主题提交给 Kafka 集群。

如果您指定 StartingPosition 作为 LATEST,则 Lambda 开始读取主题下每个分区中的最新消息。由于在触发器配置后 Lambda 开始读取消息之前可能会有一些延迟,因此 Lambda 不会读取在此窗口中生成的任何消息。

Lambda 处理来自一个或多个指定 Kafka 主题分区的记录,并将 JSON 有效负载发送到您的函数。当有更多记录可用时,Lambda 根据您在 CreateEventSourceMapping 中指定的 BatchSize 值,继续对记录进行批处理,直到函数赶上主题的速度。

如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。您可以将所有重试都失败的记录发送到失败时的目标,以供日后处理。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

轮询和流的起始位置

请注意,事件源映射创建和更新期间的流轮询最终是一致的。

  • 在事件源映射创建期间,可能需要几分钟才能开始轮询来自流的事件。

  • 在事件源映射更新期间,可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着,如果你指定 LATEST 作为流的起始位置,事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件,请将流的起始位置指定为 TRIM_HORIZON 或 AT_TIMESTAMP

Kafka 事件源的自动伸缩

当您最初创建 Apache Kafka 事件源 时,Lambda 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,Lambda 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果您的目标 Lambda 函数过载,Lambda 会减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。

要监控 Kafka 主题的吞吐量,您可以查看 Apache Kafka 使用者指标,例如 consumer_lagconsumer_offset。要检查并行发生的函数调用次数,还可以监控函数的并发指标

事件源 API 操作

使用 Lambda 控制台、Amazon开发工具包或 Amazon CLI 将 Kafka 集群作为事件源添加时,Lambda 会使用 API 来处理您的请求。

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

事件源映射错误

将 Apache Kafka 集群作为 Lambda 函数的事件源添加时,如果您的函数遇到错误,Kafka 使用者将停止处理记录。主题分区的使用者是那些订阅、阅读和处理记录的使用者。您的其他 Kafka 使用者可以继续处理记录,只要他们没有遇到同样的错误即可。

要确定使用者停止的原因,请检查 StateTransitionReason 响应中的 EventSourceMapping 字段。下表列出了您可能收到的事件源错误:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 无法对事件源进行身份验证。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

函数配置无效。

注意

如果您的 Lambda 事件记录超过允许的 6 MB 大小限制,那么它们可能处于未处理状态。

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标的值是写入 Kafka 事件源主题的最后一条记录与函数的使用者组处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和使用者组处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能表明函数的使用者组中的轮询器存在问题。有关更多信息,请参阅使用 Lambda 函数指标

自行管理的 Apache Kafka 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有一些参数适用于 Apache Kafka。

应用于自行管理的 Apache Kafka 的事件源参数
参数 必需 默认值 备注

BatchSize

100

最大值:10000

已启用

已启用

FunctionName

FilterCriteria

Lambda 事件筛选

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

SelfManagedEventSource

Kafka 代理列表。只能在 Create(创建)设置

SelfManagedKafkaEventSourceConfig

包含 ConsumerGroupId 字段,该字段默认为唯一值。

只能在 Create(创建)设置

SourceAccessConfigurations

无凭证

集群的 VPC 信息或身份验证凭据

对于 SASL_PLAIN,设置为 BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP、TRIM_HORIZON 或 LATEST

只能在 Create(创建)设置

StartingPositionTimestamp

当 StartingPosition 设置为 AT_TIMESTAMP 时,为必需项

主题

主题名称

只能在 Create(创建)设置