为自托管式 Apache Kafka 事件源创建 Lambda 事件源映射
要创建事件源映射,您可以使用 Lambda 控制台、Amazon Command Line Interface (CLI) 或 Amazon SDK
以下控制台步骤添加自托管式 Apache Kafka 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。
先决条件
-
自行管理的 Apache Kafka 集群。Lambda 支持 Apache Kafka 版本 0.10.1.0 及更高版本。
-
一个有权访问您自行管理的 Kafka 集群所用 Amazon 资源的执行角色。
添加自行管理的 Kafka 集群(控制台)
按照以下步骤将自行管理的 Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。
将 Apache Kafka 触发器添加到 Lambda 函数(控制台)
-
打开 Lamba 控制台的 Functions(函数)页面。
-
选择 Lambda 函数的名称。
-
在 Function overview(函数概览)下,选择 Add trigger(添加触发器)。
-
在 Trigger configuration(触发配置)下,执行以下操作:
-
选择 Apache Kafka 触发类型。
-
对于 Bootstrap servers(Bootstrap 引导服务器),输入集群中 Kafka 代理的主机和端口对地址,然后选择 Add(添加)。对集群中的每个 Kafka 代理重复此操作。
-
对于 Topic name(主题名称),输入用于在集群中存储记录的 Kafka 主题的名称。
-
(可选)对于 Batch size(批处理大小),输入要在单个批次中接收的最大记录数。
-
对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。
-
(可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。
-
(可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。
-
(可选)对于 VPC,请为您的 Kafka 集群选择 Amazon VPC。然后,选择 VPC subnets(VPC 子网)和 VPC security groups(VPC 安全组)。
如果仅 VPC 内部的用户访问代理,则需要此设置。
-
(可选)对于 Authentication(身份验证),请选择 Add(添加),然后执行以下操作:
-
选择集群中 Kafka 代理的访问权限或身份验证协议。
-
如果 Kafka 代理使用 SASL/PLAIN 身份验证,请选择 BASIC_AUTH。
-
如果代理使用 SASL/SCRAM 身份验证,请选择其中一个 SASL_SCRAM 协议。
-
如果要配置 mTLS 身份验证,请选择 CLIENT_CERTIFICATE_TLS_AUTH 协议。
-
-
对于 SASL/SCRAM 或 mTLS 身份验证,请选择包含 Kafka 集群凭据的 Secrets Manager 私有密钥。
-
-
(可选)对于 Encryption(加密),如果您的 Kafka 代理使用由私有 CA 签名的证书,请选择包含根 CA 证书的 Secrets Manager 密钥,Kafka 代理使用该证进行 TLS 加密。
此设置适用于 SASL/SCRAM 或 SASL/PLAIN 的 TLS 加密,以及 mTLS 身份验证。
-
要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。
-
-
要创建触发器,请选择 Add(添加)。
添加自行管理的 Kafka 集群 (Amazon CLI)
使用以下示例 Amazon CLI 命令为 Lambda 函数创建和查看自行管理的 Apache Kafka 触发器。
使用 SASL/SCRAM
如果 Kafka 用户通过互联网访问您的 Kafka 代理,则需要指定为 SASL/SCRAM 身份验证创建的 Secrets Manager。以下示例使用 create-event-source-mappingmy-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName\ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
使用 VPC
如果仅 VPC 中的 Kafka 用户访问 Kafka 代理,则必须指定 VPC、子网和 VPC 安全组。以下示例使用 create-event-source-mappingmy-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。
aws lambda create-event-source-mapping \ --topicsAWSKafkaTopic\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'
使用 Amazon CLI 查看状态
以下示例使用 get-event-source-mapping
aws lambda get-event-source-mapping --uuiddh38738e-992b-343a-1077-3478934hjkfd7