将 Lambda 与 Apache Kafka 集群结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

将 Lambda 与 Apache Kafka 集群结合使用

Apache Kafka 是一种分布式数据存储,经过优化以实时提取和处理流数据。

Kafka 主要用于构建适应数据流的流数据管道和应用程序。它结合了消息收发、存储和流处理功能,能够存储历史和实时数据。

Amazon 提供托管 Kafka 服务,或者您可以将非 Amazon Kafka 集群与 Lambda 结合使用。

托管 Apache Kafka 集群

您可以在 Amazon 或您选择的任何其他云提供商上托管 Apache Kafka 集群。只要 Lambda 能够访问集群,Lambda 就支持将 Kafka 作为 事件源,而无论它托管在何处。

使用 Amazon MSK

要托管 Apache Kafka 集群和主题,您可以使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK)

有关使用 MSK 集群的更多信息,请参阅 将 Lambda 与 Amazon MSK 结合使用

使用自行管理的 Kafka 提供商

要托管 Apache Kafka 集群和主题,您可以使用任何非 Amazon 云提供商,例如 CloudKarafka

您还可以为 Apache Kafka 集群和主题使用其他 Amazon 托管选项。有关更多信息,请参阅 Amazon 大数据博客上的 在 Amazon 上运行 Apache Kafka 的最佳实践

有关使用自行管理的 Kafka 集群的更多信息,请参阅 将 Lambda 与 self-managed Apache Kafka 结合使用

将 Apache Kafka 集群用作 Lambda 事件源

您可以在 Amazon 或您选择的任何其他云提供商上托管 Apache Kafka 集群。只要 Lambda 能够访问集群,Lambda 就支持将 Kafka 作为 事件源,而无论它托管在何处。

先决条件

工作原理

当您将 Apache Kafka 集群添加为 Lambda 函数的触发器时,集群将用作 事件源。 当您将 Kafka 集群和主题添加为事件源时,Lambda 将创建具有事件源 UUID 的使用者组。

  • 如果您在 EventSourceArn 中将 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群用作事件源,Lambda 会使用指定的 Amazon MSK 集群和 Kafka 主题读取事件数据。

  • 如果您在 SelfManagedEventSource 中将 非 Amazon 托管的 Apache Kafka 集群 — 或 Amazon 在另一个 Amazon 服务上托管了 Apache Kafka 集群— 用作事件源,Lambda 会使用指定的 Kafka 主机、主题和连接详细信息读取事件数据。

  • Lambda 根据 StartingPosition 中指定的起始位置,从 Topics 中指定的 Kafka 主题读取事件数据。成功进行处理后,会将 Kafka 主题提交给 Kafka 集群。

  • Lambda处理来自一个或多个指定 Kafka 主题分区的记录,并将 JSON 负载发送到 Lambda 函数。当有更多记录可用时,Lambda 根据 BatchSize 指定的值,继续进行批处理,直到函数赶上主题的速度。

  • Lambda 支持对 Kafka 代理进行 Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 身份验证。Lambda 在 SourceAccessConfigurations 中使用 Amazon Secrets Manager 密钥中指定的 SASL/SCRAM 用户名和密码。

对于 Amazon MSK 和 self-managed Apache Kafka,Lambda 在停止函数前最长允许其运行 14 分钟。

事件源 API 操作

您使用 Lambda 控制台、Amazon SDK 或 Amazon Command Line Interface (Amazon CLI) 将 Kafka 集群作为 Lambda 函数的 事件源 进行添加时,Lambda 会使用 API 来处理您的请求。

要使用 Amazon CLIAmazon 开发工具包管理事件源,您可以使用以下 API 操作:

事件源映射错误

您将 Apache Kafka 集群作为 Lambda 函数的 事件源 进行添加时,如果您的函数遇到错误,Kafka 使用者将停止处理记录。主题分区的使用者是那些订阅、阅读和处理记录的使用者。您的其他 Kafka 使用者可以继续处理记录,只要他们没有遇到同样的错误即可。

要确定使用者停止的原因,请检查 EventSourceMapping 响应中的 StateTransitionReason 字段。下表列出了您可能收到的事件源错误:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 不能验证事件源。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

Lambda 函数的配置无效。

注意

如果您的 Lambda 事件记录超过允许的 6 MB 大小限制,那么它们可能处于未处理状态。

将 Lambda 与 self-managed Apache Kafka 结合使用

您可以注册 非 Amazon 托管的 Apache Kafka 集群 — 或 Amazon 在另一个 Amazon 服务上托管了 Apache Kafka 集群 —,作为 Lambda 函数的 事件源。这可以让您触发函数,以响应发送到 Kafka 集群的记录。

管理 self-managed Apache Kafka 集群的访问权限和权限

Lambda 轮询 Apache Kafka 主题分区以获取新记录并同步调用 Lambda 函数。要更新集群使用的其他 Amazon 资源,您的 Lambda 函数以及您的 Amazon Identity and Access Management (IAM) 用户和角色必须具有执行这些操作的权限。

本页面介绍了如何向自行管理的 Kafka 集群的 Lambda 和其他用户授予权限。

所需的 Lambda 函数权限

要在 Amazon CloudWatch Logs 中创建日志并将日志存储到日志组,Lambda 函数必须在它的 执行角色中具有以下权限:

可选的 Lambda 函数权限

您的 Lambda 函数可能需要相应的权限才能描述 Amazon Secrets Manager 密钥或 Amazon Key Management Service (Amazon KMS) 客户托管的 CMK,或者访问 Virtual Private Cloud (VPC)。

Secrets Manager 和 Amazon KMS 权限

如果您的 Kafka 用户通过互联网访问您的 Apache Kafka 代理,则必须指定一个 Secrets Manager 密钥。有关更多信息,请参阅 使用 SASL/SCRAM 身份验证

您的 Lambda 函数可能需要相应权限才能描述 Secrets Manager 密钥或解密 Amazon KMS 客户托管的 CMK。要连接到这些资源,函数的执行角色必须具有以下权限:

VPC 权限

如果仅 VPC 中的用户访问您的 self-managed Apache Kafka 集群,则您的 Lambda 函数需要 Amazon Virtual Private Cloud (Amazon VPC) 资源(包括 VPC、子网、安全组和网络接口)的访问权限。要连接到这些资源,函数的 执行角色必须具有以下权限:

向执行角色添加权限

要访问 self-managed Apache Kafka 集群使用的其他 Amazon 服务,Lambda 需使用您在函数的 执行角色中定义的权限策略。

默认情况下,Lambda 无权为 self-managed Apache Kafka 集群执行必需或可选操作。您必须在 IAM 信任策略中创建和定义这些操作,然后将策略附加到执行角色。此示例演示了如何创建允许 Lambda 访问您的 Amazon VPC 资源的策略。

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"arn:aws-cn:ec2:us-east-1:01234567890:instance/my-instance-name" } ] }

有关在 IAM 控制台上创建 JSON 策略文档的信息,请参阅 IAM 用户指南 中的 在 JSON 选项卡上创建策略

将用户添加到 IAM 策略

默认情况下,IAM 用户和角色无权执行 事件源 API 操作。要向组织或账户中的用户授予访问权限,您可能需要创建一个基于身份的策略。有关更多信息,请参阅 IAM 用户指南 中的 使用策略访问 Amazon 资源

使用 SASL/SCRAM 身份验证

重要

如果您使用的是基于 SASL/SCRAM 的身份验证,则不支持纯文代理。您必须为代理使用 TLS 加密。

self-managed Apache Kafka集群的用户名和密码身份验证使用 Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM)。SCRAM 使用安全哈希算法,不会在客户端和服务器之间传输纯文本密码。有关 SASL/SCRAM 身份验证的更多信息,请参阅 RFC 5802

要为自行管理的 Kafka 集群设置用户名和密码身份验证,请在 Amazon Secrets Manager 中创建一个密钥。您的非 Amazon 云提供商必须以 SASL/SCRAM 格式为您提供用户名和密码。例如:

{ "username": "ab1c23de", "password": "qxbbaLRG7JXYN4NpNMVccP4gY9WZyDbp" }

有关更多信息,请参阅 Amazon Secrets Manager 用户指南 中的 教程:创建和检索密钥

将 self-managed Apache Kafka 集群添加为事件源

当 Apache Kafka 集群配置为事件源时,您可以使用 Lambda 函数处理集群中的记录。要创建事件源映射,您可以使用 Lambda 控制台、Amazon 开发工具包Amazon Command Line Interface (Amazon CLI) 将 Kafka 集群添加为 Lambda 函数触发器

本节介绍如何使用 Lambda 控制台或 Amazon CLI 将 Kafka 集群和主题添加为函数触发器。

先决条件

使用 Lambda 控制台添加 self-managed Apache Kafka 集群

按照以下步骤将 self-managed Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Apache Kafka 触发器添加到 Lambda 函数(控制台)

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择 Lambda 函数的名称。

  3. Function overview (函数概览) 下,选择 Add trigger (添加触发器)

  4. Trigger configuration (触发器配置) 下,选择 Apache Kafka 触发器类型。

  5. 配置其余选项,然后选择 Add (添加)

使用 Amazon CLI 添加 self-managed Apache Kafka 集群

使用以下示例 Amazon CLI 命令为 Lambda 函数创建和查看 self-managed Apache Kafka 触发器。

使用 SASL/SRAM

如果 Kafka 用户通过互联网访问您的 Kafka 代理,则必须指定您为 SASL/SCRAM 身份验证创建的 Amazon Secrets Manager 密钥。以下示例使用 create-event-source-mapping Amazon CLI 命令将名为 my-kafka-function 的 Lambda 函数映射到名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws-cn:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName --function-name arn:aws-cn:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

使用 VPC

如果仅 Virtual Private Cloud (VPC) 中的 Kafka 用户访问 Kafka 代理,则必须指定 VPC、子网和 VPC 安全组。以下示例使用 create-event-source-mapping Amazon CLI 命令将名为 my-kafka-function 的 Lambda 函数映射到名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping --topics AWSKafkaTopic --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"},{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"},{"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' --function-name arn:aws-cn:lambda:us-east-1:01234567890:function:my-kafka-function --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

查看状态

以下示例使用 get-event-source-mapping Amazon CLI 命令描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7