为自托管式 Apache Kafka 事件源创建 Lambda 事件源映射 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

为自托管式 Apache Kafka 事件源创建 Lambda 事件源映射

要创建事件源映射,您可以使用 Lambda 控制台、Amazon Command Line Interface (CLI)Amazon SDK

以下控制台步骤添加自托管式 Apache Kafka 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。

先决条件

  • 自行管理的 Apache Kafka 集群。Lambda 支持 Apache Kafka 版本 0.10.1.0 及更高版本。

  • 一个有权访问您自行管理的 Kafka 集群所用 Amazon 资源的执行角色

添加自行管理的 Kafka 集群(控制台)

按照以下步骤将自行管理的 Apache Kafka 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Apache Kafka 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 Apache Kafka 触发类型。

    2. 对于 Bootstrap servers(Bootstrap 引导服务器),输入集群中 Kafka 代理的主机和端口对地址,然后选择 Add(添加)。对集群中的每个 Kafka 代理重复此操作。

    3. 对于 Topic name(主题名称),输入用于在集群中存储记录的 Kafka 主题的名称。

    4. (可选)对于 Batch size(批处理大小),输入要在单个批次中接收的最大记录数。

    5. 对于 Batch window(批处理时段),输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

    6. (可选)对于 Consumer group ID(使用者组 ID),输入要加入的 Kafka 使用者组的 ID。

    7. (可选)对于起始位置,选择最新即可从最新记录开始读取流,选择最早即可从最早的可用记录开始读取流,选择在时间戳处即可从指定的时间戳开始读取流。

    8. (可选)对于 VPC,请为您的 Kafka 集群选择 Amazon VPC。然后,选择 VPC subnets(VPC 子网)和 VPC security groups(VPC 安全组)。

      如果仅 VPC 内部的用户访问代理,则需要此设置。

    9. (可选)对于 Authentication(身份验证),请选择 Add(添加),然后执行以下操作:

      1. 选择集群中 Kafka 代理的访问权限或身份验证协议。

        • 如果 Kafka 代理使用 SASL/PLAIN 身份验证,请选择 BASIC_AUTH

        • 如果代理使用 SASL/SCRAM 身份验证,请选择其中一个 SASL_SCRAM 协议。

        • 如果要配置 mTLS 身份验证,请选择 CLIENT_CERTIFICATE_TLS_AUTH 协议。

      2. 对于 SASL/SCRAM 或 mTLS 身份验证,请选择包含 Kafka 集群凭据的 Secrets Manager 私有密钥。

    10. (可选)对于 Encryption(加密),如果您的 Kafka 代理使用由私有 CA 签名的证书,请选择包含根 CA 证书的 Secrets Manager 密钥,Kafka 代理使用该证进行 TLS 加密。

      此设置适用于 SASL/SCRAM 或 SASL/PLAIN 的 TLS 加密,以及 mTLS 身份验证。

    11. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加自行管理的 Kafka 集群 (Amazon CLI)

使用以下示例 Amazon CLI 命令为 Lambda 函数创建和查看自行管理的 Apache Kafka 触发器。

使用 SASL/SCRAM

如果 Kafka 用户通过互联网访问您的 Kafka 代理,则需要指定为 SASL/SCRAM 身份验证创建的 Secrets Manager。以下示例使用 create-event-source-mapping Amazon CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 VPC

如果仅 VPC 中的 Kafka 用户访问 Kafka 代理,则必须指定 VPC、子网和 VPC 安全组。以下示例使用 create-event-source-mapping Amazon CLI 命令将名为 my-kafka-function 的 Lambda 函数映射至名为 AWSKafkaTopic 的 Kafka 主题。

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

使用 Amazon CLI 查看状态

以下示例使用 get-event-source-mapping Amazon CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7