为 Amazon MSK 事件源创建 Lambda 事件源映射 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

为 Amazon MSK 事件源创建 Lambda 事件源映射

要创建事件源映射,您可以使用 Lambda 控制台、Amazon Command Line Interface (CLI)Amazon SDK

注意

创建事件源映射时,Lambda 会在包含 MSK 集群的私有子网中创建 Hyperplane ENI,从而允许 Lambda 建立安全连接。此 Hyperplane ENI 使用 MSK 集群的子网和安全组配置,而不是 Lambda 函数。

以下控制台步骤添加 Amazon MSK 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。

将 Amazon MSK 触发器添加到 Lambda 函数(控制台)
  1. 打开 Lambda 控制台的函数页面

  2. 选择您要为其添加 Amazon MSK 触发器的 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. 触发器配置下,选择 MSK

  5. 要指定 Kafka 集群详细信息,请执行以下操作:

    1. 对于 MSK cluster(MSK 集群),选择您的集群。

    2. 对于主题名称,输入要从中使用消息的 Kafka 主题的名称。

    3. 对于使用者组 ID,输入要加入的 Kafka 使用者组的 ID(如适用)。有关更多信息,请参阅 Lambda 中可自定义的使用者组 ID

  6. 对于集群身份验证,请进行必要的配置。有关集群身份验证的更多信息,请参阅在 Lambda 中配置集群身份验证方法

    • 如果您希望 Lambda 在建立连接时对 MSK 集群执行身份验证,请开启使用身份验证。建议进行身份验证。

    • 如果使用身份验证,对于身份验证方法,选择要使用的身份验证方法。

    • 如果您使用身份验证,对于 Secrets Manager 密钥,选择包含访问集群所需身份验证凭证的 Secrets Manager 密钥。

  7. 事件轮询器配置下,进行必要的配置。

    • 选择激活触发器以在创建后立即启用该触发器。

    • 选择是否要为事件源映射配置预置模式。有关更多信息,请参阅 Lambda 中的事件轮询器扩展模式

      • 如果您配置了预置模式,输入最小事件轮询器数值,最大事件轮询器数值,或同时输入两个值。

    • 对于起始位置,选择您希望 Lambda 如何开始从流中读取。有关更多信息,请参阅 Lambda 中轮询和流的起始位置

  8. 批处理下,进行必要的配置。有关批处理的更多信息,请参阅批处理行为

    1. 对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。

    2. 对于批处理时段,输入 Lambda 在调用函数之前收集记录所花费的最大秒数。

  9. 筛选下,进行必要的配置。有关筛选的更多信息,请参阅 对 Amazon MSK 事件源使用事件筛选

    • 筛选条件中,添加筛选条件定义以确定是否处理事件。

  10. 故障处理下,进行必要的配置。有关故障处理的更多信息,请参阅捕获 Amazon MSK 事件源丢弃的批处理

    • 对于故障目标,指定故障时目标的 ARN。

  11. 对于标签,输入要与此事件源映射关联的标签。

  12. 要创建触发器,请选择 Add(添加)。

您还可以使用 Amazon CLI 中的 create-event-source-mapping 命令创建事件源映射。以下示例创建事件源映射,将 Lambda 函数 my-msk-function 映射到 AWSKafkaTopic 主题,从 LATEST 消息开始。此命令还使用 SourceAccessConfiguration 对象指示 Lambda 在连接到集群时使用 SASL/SCRAM 身份验证。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

如果集群使用 mTLS 身份验证,请包含指定 CLIENT_CERTIFICATE_TLS_AUTHSourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。如下面的命令所示:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

当集群使用 IAM 身份验证时,您不需要 SourceAccessConfiguration 对象。如下面的命令所示:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function