将 Lambda 与 Amazon MSK 结合使用 - AWS Lambda
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将 Lambda 与 Amazon MSK 结合使用

Amazon Managed Streaming for Apache Kafka (Amazon MSK) 是一项托管服务,可让您构建和运行使用 Apache Kafka 来处理流式传输数据的应用程序。Apache Kafka 是一个分布式流处理平台,概念上类似于 Amazon Kinesis。借助 Amazon MSK,您可以从多个来源收集数据,并与多个使用者一起处理这些数据。

您可以使用 Lambda 函数来处理 Kafka 主题中的记录。您的函数是通过事件源映射触发的,事件源映射是从主题读取内容并调用函数的 Lambda 资源。Lambda 会轮询多个分区来查找新记录并同步调用目标函数。

Amazon MSK 事件源映射支持以下功能:

  • 与 Amazon MSK 支持的所有 Kafka 版本完全兼容。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的支持的 Apache Kafka 版本

  • 明文和 TLS 加密代理。TLS 代理不受私有证书颁发机构的支持。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密中的传输中加密部分。

  • 可配置的起始位置和批处理大小。支持的可配置起始位置为 TRIM_HORIZONLATEST。它们不是基于时间戳的。

不支持以下 Kafka 功能:

  • 身份验证 – 不支持 SSL 和 SASL 身份验证。

  • 架构注册表 – 您可以托管自己的架构注册表,但 Lambda API 不支持此功能。有关更多信息,请参阅 Confluent 网站上的架构管理

Lambda 使用者组

要与 Amazon MSK 进行交互,Lambda 创建一个可以从多个 Kafka 主题中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。Lambda 创建的使用者组也用于检查点。成功处理后,该组在每个主题分区中的位置将提交给 Kafka。

Lambda 会处理来自一个或多个分区的记录,然后将负载发送到目标函数。当有更多记录可用时,Lambda 会继续进行批处理,直到函数赶上主题的速度。支持的最长函数执行时间为 14 分钟。

例 Amazon MSK 记录事件

Received event:{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", "records": { "AWSKafkaTopic-0": [ { "topic": "AWSKafkaTopic", "partition": 0, "offset": 0, "timestamp": 1595035749700, "timestampType": "CREATE_TIME", "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" } ] } }
注意

aws:kafka 资源的键值对采用 base64 编码。

执行角色权限

Lambda 函数的 执行角色 需要拥有以下权限才能从 Amazon MSK 集群读取记录:

AWS 托管策略 AWSLambdaMSKExecutionRole 包含这些权限。有关更多信息,请参阅Lambda 功能的托管策略

将主题配置为事件源

创建事件源映射以指示 Lambda 将 Kafka 主题中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个函数处理相同的数据,或使用单个函数处理来自多个主题的项目。

要将您的函数配置为从 Amazon MSK 读取,请在 Lambda 控制台中创建 MSK 触发器。

创建触发器

  1. 打开 Lambda 控制台 函数页面

  2. 选择函数。

  3. Designer 下方,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 对于 Amazon MSK 事件源支持以下选项:

  • MSK 集群 – 选择 MSK 集群。

  • 主题名称 – 输入要使用的 Kafka 主题。

  • 起始位置(可选)– 输入流中开始读取记录的位置。

    • 最新 – 从主题的所有分区中的最新位置读取。

    • 修剪 Horizon – 从所有主题分区中的最旧位置读取。

    在处理任何现有记录后,函数将继续处理新记录。

  • 启用触发器 – 禁用触发器以停止处理记录。

要启用或禁用触发器(或删除触发器),请在设计器中选择 MSK 触发器。要重新配置触发器,请使用事件源映射 API 命令。

事件源映射 API

要使用 AWS CLI 或 AWS 开发工具包管理事件源映射,请使用以下 API 操作:

要使用 AWS Command Line Interface (AWS CLI) 创建事件源映射,请使用 create-event-source-mapping 命令。要从 Amazon MSK 代理获取记录,需要能够访问与 MSK 集群关联的 Amazon Virtual Private Cloud (Amazon VPC)。为满足 Amazon VPC 访问要求,您可以为每个公有子网配置一个 NAT 网关。有关更多信息,请参阅VPC 连接函数的 Internet 和服务访问

Amazon VPC 配置可通过 Amazon MSK API 发现,无需在 create-event-source-mapping 设置中配置。

以下 AWS CLI 示例将名为 my-kafka-function 的 Lambda 函数映射到名为 AWSKafkaTopic 的 Kafka 主题,起始位置设置为 latest

$ aws lambda create-event-source-mapping --event-source-arn arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 --topics AWSKafkaTopic --starting-position LATEST --function-name my-kafka-function { "UUID": "6d9bce8e-836b-442c-8070-74e77903c815", "BatchSize": 100, "EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2", "FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function", "LastModified": 1580331394.363, "State": "Creating", "StateTransitionReason": "USER_INITIATED", "LastProcessingResult": "OK", "Topics": [ "AWSKafkaTopic" ] }

使用 get-event-source-mapping 命令查看资源的当前状态。

$ aws lambda get-event-source-mapping --uuid 6d9bce8e-836b-442c-8070-74e77903c815 { "UUID": "6d9bce8e-836b-442c-8070-74e77903c815" "BatchSize": 100, "EventSourceArn": "arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2", "FunctionArn": "arn:aws:lambda:us-west-2:111111111111:function:my-kafka-function", "LastModified": 1580331394.363, "State": "Enabled", "StateTransitionReason": "User action", "LastProcessingResult": "OK", "Topics": [ "AWSKafkaTopic" ], }

事件源映射错误

当 Lambda 函数遇到不可恢复的错误时,Kafka 主题使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误,则可以继续处理。要确定使用者停止的潜在原因,请检查 EventSourceMapping 返回的详细信息中的 StateTransitionReason 字段中是否有以下代码:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 未能验证事件源。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

函数的配置无效。

如果记录由于其大小而被丢弃,记录也将处于未处理状态。Lambda 记录的大小限制为 6 MB。