为 Amazon MSK 事件源创建 Lambda 事件源映射
要创建事件源映射,您可以使用 Lambda 控制台、Amazon Command Line Interface (CLI) 或 Amazon SDK
注意
创建事件源映射时,Lambda 会在包含 MSK 集群的私有子网中创建 Hyperplane ENI,从而允许 Lambda 建立安全连接。此 Hyperplane ENI 使用 MSK 集群的子网和安全组配置,而不是 Lambda 函数。
以下控制台步骤添加 Amazon MSK 集群作为 Lambda 函数的触发器。这将在后台创建一个事件源映射资源。
将 Amazon MSK 触发器添加到 Lambda 函数(控制台)
-
打开 Lambda 控制台的函数页面
。 -
选择您要为其添加 Amazon MSK 触发器的 Lambda 函数的名称。
-
在 Function overview(函数概览)下,选择 Add trigger(添加触发器)。
-
在触发器配置下,选择 MSK。
-
要指定 Kafka 集群详细信息,请执行以下操作:
-
对于 MSK cluster(MSK 集群),选择您的集群。
-
对于主题名称,输入要从中使用消息的 Kafka 主题的名称。
-
对于使用者组 ID,输入要加入的 Kafka 使用者组的 ID(如适用)。有关更多信息,请参阅 Lambda 中可自定义的使用者组 ID。
-
-
对于集群身份验证,请进行必要的配置。有关集群身份验证的更多信息,请参阅在 Lambda 中配置集群身份验证方法。
-
如果您希望 Lambda 在建立连接时对 MSK 集群执行身份验证,请开启使用身份验证。建议进行身份验证。
-
如果使用身份验证,对于身份验证方法,选择要使用的身份验证方法。
-
如果您使用身份验证,对于 Secrets Manager 密钥,选择包含访问集群所需身份验证凭证的 Secrets Manager 密钥。
-
-
在事件轮询器配置下,进行必要的配置。
-
选择激活触发器以在创建后立即启用该触发器。
-
选择是否要为事件源映射配置预置模式。有关更多信息,请参阅 Lambda 中的事件轮询器扩展模式。
-
如果您配置了预置模式,输入最小事件轮询器数值,最大事件轮询器数值,或同时输入两个值。
-
-
对于起始位置,选择您希望 Lambda 如何开始从流中读取。有关更多信息,请参阅 Lambda 中轮询和流的起始位置。
-
-
在批处理下,进行必要的配置。有关批处理的更多信息,请参阅批处理行为。
-
对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。
-
对于批处理时段,输入 Lambda 在调用函数之前收集记录所花费的最大秒数。
-
-
在筛选下,进行必要的配置。有关筛选的更多信息,请参阅 对 Amazon MSK 事件源使用事件筛选。
-
在筛选条件中,添加筛选条件定义以确定是否处理事件。
-
-
在故障处理下,进行必要的配置。有关故障处理的更多信息,请参阅捕获 Amazon MSK 事件源丢弃的批处理。
-
对于故障目标,指定故障时目标的 ARN。
-
-
对于标签,输入要与此事件源映射关联的标签。
-
要创建触发器,请选择 Add(添加)。
您还可以使用 Amazon CLI 中的 create-event-source-mappingmy-msk-function
映射到 AWSKafkaTopic
主题,从 LATEST
消息开始。此命令还使用 SourceAccessConfiguration 对象指示 Lambda 在连接到集群时使用 SASL/SCRAM 身份验证。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
如果集群使用 mTLS 身份验证,请包含指定 CLIENT_CERTIFICATE_TLS_AUTH
的 SourceAccessConfiguration 对象以及 Secrets Manager 密钥 ARN。如下面的命令所示:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
当集群使用 IAM 身份验证时,您不需要 SourceAccessConfiguration 对象。如下面的命令所示:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function