教程:使用 Amazon MSK 事件源映射调用 Lambda 函数 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

教程:使用 Amazon MSK 事件源映射调用 Lambda 函数

在本教程中,您将执行以下操作:

  • 在与现有 Amazon MSK 集群相同的 Amazon 账户中创建 Lambda 函数。

  • 为 Lambda 配置联网和身份验证,以便与 Amazon MSK 通信。

  • 设置 Lambda Amazon MSK 事件源映射,其在主题中出现事件时运行您的 Lambda 函数。

完成这些步骤后,当事件发送到 Amazon MSK 时,您将能够设置 Lambda 函数,以使用自己的自定义 Lambda 代码自动处理这些事件。

您可以用这项功能做什么?

示例解决方案:使用 MSK 事件源映射向您的客户提供实时比分。

请考虑以下应用场景:您的公司托管了一个 Web 应用程序,您的客户可以在其中查看有关直播活动(例如体育比赛)的信息。比赛的信息更新将通过 Amazon MSK 上的 Kafka 主题提供给您的团队。您想设计一个解决方案,使用来自 MSK 主题的更新在您开发的应用程序中向客户提供直播活动的更新视图。您已决定采用以下设计方法:您的客户端应用程序将与在 Amazon 中托管的无服务器后端进行通信。客户端将使用 Amazon API Gateway WebSocket API 通过 websocket 会话进行连接。

在此解决方案中,您需要一个组件来读取 MSK 事件,执行一些自定义逻辑为应用程序层准备这些事件,然后将该信息转发到 API Gateway API。您可以使用 Amazon Lambda 实现此组件,方法是在 Lambda 函数中提供自定义逻辑,然后使用 Amazon Lambda Amazon MSK 事件源映射对其进行调用。

有关使用 Amazon API Gateway WebSocket API 实施解决方案的更多信息,请参阅 API Gateway 文档中的 WebSocket API 教程

先决条件

具有以下预配置资源的 Amazon 账户:

要满足这些先决条件,建议按照 Amazon MSK 文档中的 Getting started using Amazon MSK 进行操作。

  • Amazon MSK 集群。请参阅 Getting started using Amazon MSK 中的 Create an Amazon MSK cluster

  • 以下配置:

    • 确保在集群安全设置中启用基于 IAM 角色的身份验证。这会将您的 Lambda 函数限制为仅访问所需的 Amazon MSK 资源,从而提高安全性。默认情况下会对新的 Amazon MSK 集群启用此设置。

    • 确保集群网络设置中的公有访问已关闭。通过限制处理数据的中介数量,限制 Amazon MSK 集群对互联网的访问,以提高您的安全性。默认情况下会对新的 Amazon MSK 集群启用此设置。

  • 您的 Amazon MSK 集群中用于此解决方案的 Kafka 主题。请参阅 Getting started using Amazon MSK 中的 Create a topic

  • 设置 Kafka 管理主机以从您的 Kafka 集群检索信息并将 Kafka 事件发送到您的主题进行测试,例如安装了 Kafka 管理 CLI 和 Amazon MSK IAM 库的 Amazon EC2 实例。请参阅 Getting started using Amazon MSK 中的 Create a client machine

设置完这些资源后,请从您的 Amazon 账户中收集以下信息,以确认您已准备好继续。

  • Amazon MSK 集群的名称。您可以在 Amazon MSK 控制台中找到这些信息。

  • 集群 UUID,您的 Amazon MSK 集群 ARN 的一部分,您可以在 Amazon MSK 控制台中找到它。按照 Amazon MSK 文档中 Listing clusters 中的步骤查找此信息。

  • 与您的 Amazon MSK 集群关联的安全组。您可以在 Amazon MSK 控制台中找到这些信息。在以下步骤中,这些安全组称为 clusterSecurityGroups

  • 包含 Amazon MSK 集群的 Amazon VPC 的 ID。您可以通过在 Amazon MSK 控制台中识别与您的 Amazon MSK 集群关联的子网,然后在 Amazon VPC 控制台中识别与该子网关联的 Amazon VPC 来找到此信息。

  • 解决方案中使用的 Kafka 主题名称。您可以通过从 Kafka 管理主机使用 Kafka topics CLI 调用您的 Amazon MSK 集群来找到此信息。有关主题 CLI 的更多信息,请参阅 Kafka 文档中的 Adding and removing topics

  • 您的 Kafka 主题使用者组的名称,适合您的 Lambda 函数使用。Lambda 可以自动创建该组,因此您无需使用 Kafka CLI 创建该组。如果您确实需要管理使用者组,了解有关使用者组 CLI 的更多信息,则请参阅 Kafka 文档中的 Managing Consumer Groups

您 Amazon 账户中有以下权限:

  • 创建和管理 Lambda 函数的权限。

  • 创建 IAM 策略并将其与您的 Lambda 函数关联的权限。

  • 在托管您的 Amazon MSK 集群的 Amazon VPC 中创建 Amazon VPC 端点和更改网络配置的权限。

为 Lambda 配置网络连接以与 Amazon MSK 通信

使用 Amazon PrivateLink 连接 Lambda 和 Amazon MSK。您可以通过在 Amazon VPC 控制台中创建接口 Amazon VPC 端点来实现此目的。有关联网配置的更多信息,请参阅 网络配置

当 Amazon MSK 事件源映射代表 Lambda 函数运行时,它会担任 Lambda 函数的执行角色。此 IAM 角色授权映射访问受 IAM 保护的资源,例如您的 Amazon MSK 集群。尽管这些组件共享执行角色,但 Amazon MSK 映射和您的 Lambda 函数对各自的任务有不同的连接要求,如下图所示。

客户 Amazon VPC 中的 Amazon MSK 集群已连接到 Lambda 服务代码,该代码轮询集群,然后使用 Amazon STS 与 Amazon Lambda 进行通信。

您的事件源映射属于 Amazon MSK 集群安全组。在此联网步骤中,从您的 Amazon MSK 集群 VPC 创建 Amazon VPC 端点,将事件源映射连接到 Lambda 和 STS 服务。保护这些端点,以接受来自您的 Amazon MSK 集群安全组的流量。然后,调整 Amazon MSK 集群安全组,以允许事件源映射与 Amazon MSK 集群进行通信。

您可以使用 Amazon Web Services Management Console 配置以下步骤。

配置接口 Amazon VPC 端点以连接 Lambda 和 Amazon MSK
  1. 为您的接口 Amazon VPC 端点创建一个安全组 endpointSecurityGroup,以允许来自 clusterSecurityGroups 端口 443 的入站 TCP 流量。按照 Amazon EC2 文档中创建安全组的步骤创建安全组。然后,按照 Amazon EC2 文档中向安全组添加规则的步骤添加相应的规则。

    使用以下信息创建安全组:

    添加入站规则时,为 clusterSecurityGroups 中的每个安全组创建一条规则。对于每条规则:

    • 对于类型,选择 HTTPS

    • 对于,选择其中一个 clusterSecurityGroups

  2. 创建一个端点,将 Lambda 服务连接到包含 Amazon MSK 集群的 Amazon VPC。按照创建接口端点中的步骤进行操作。

    使用以下信息创建接口端点:

    • 对于服务名称,选择 com.amazonaws.regionName.lambda,其中 regionName 托管您的 Lambda 函数。

    • 对于 VPC,选择包含您的 Amazon MSK 集群的 Amazon VPC。

    • 对于安全组,选择您之前创建的 endpointSecurityGroup

    • 对于子网,选择托管您的 Amazon MSK 集群的子网。

    • 对于策略,请提供以下策略文档,其保护端点以供 Lambda 服务主体用于执行 lambda:InvokeFunction 操作。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 确保启用 DNS 名称保持设置状态。

  3. 创建一个端点,将 Amazon STS 服务连接到包含 Amazon MSK 集群的 Amazon VPC。按照创建接口端点中的步骤进行操作。

    使用以下信息创建接口端点:

    • 对于服务名称,选择 Amazon STS。

    • 对于 VPC,选择包含您的 Amazon MSK 集群的 Amazon VPC。

    • 对于安全组,选择 endpointSecurityGroup

    • 对于子网,选择托管您的 Amazon MSK 集群的子网。

    • 对于策略,请提供以下策略文档,其保护端点以供 Lambda 服务主体用于执行 sts:AssumeRole 操作。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 确保启用 DNS 名称保持设置状态。

  4. 对于与您的 Amazon MSK 集群关联的每个安全组(即 clusterSecurityGroups),允许执行以下操作:

    • 允许端口 9098 上到所有 clusterSecurityGroups(包括其自身内部)的所有入站和出站 TCP 流量。

    • 允许端口 443 上的所有出站 TCP 流量。

    默认安全组规则允许部分流量,因此,如果您的集群连接到单个安全组,并且该组有默认规则,则不需要其他规则。要调整安全组规则,请按照 Amazon EC2 文档中向安全组添加规则的步骤进行操作。

    使用以下信息向您的安全组添加规则:

    • 对于端口 9098 的每条入站规则或出站规则,请提供

      • 对于 Type (类型),选择 Custom TCP (自定义 TCP)

      • 对于端口范围,请提供 9098。

      • 对于,提供其中一个 clusterSecurityGroups

    • 对于端口 443 的每条入站规则的类型,选择 HTTPS

为 Lambda 创建 IAM 角色,以从您的 Amazon MSK 主题中读取

确定 Lambda 从 Amazon MSK 主题读取的身份验证要求,然后在策略中进行定义。创建一个角色 lambdaAuthRole,授权 Lambda 使用这些权限。使用 kafka-cluster IAM 操作在您的 Amazon MSK 集群上授权操作。然后,授权 Lambda 执行发现和连接到您的 Amazon MSK 集群所需的 Amazon MSK kafka 和 Amazon EC2 操作以及 CloudWatch 操作,这样 Lambda 便可记录其所执行的操作。

描述 Lambda 从 Amazon MSK 读取的身份验证要求
  1. 编写一个 IAM 策略文档(JSON 文档)clusterAuthPolicy,允许 Lambda 使用您的 Kafka 使用者组从 Amazon MSK 集群中的 Kafka 主题进行读取。Lambda 要求在读取时设置一个 Kafka 使用者组。

    修改以下模板以符合您的先决条件:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    有关更多信息,请参阅 基于 IAM 角色的身份验证。编写策略时:

    • 对于 regionaccount-id,提供托管您的 Amazon MSK 集群的区域和账户 ID。

    • 对于 mskClusterName,提供您的 Amazon MSK 集群的名称。

    • 对于 cluster-uuid,提供您的 Amazon MSK 集群的 ARN 中 UUID。

    • 对于 mskTopicName,提供您的 Kafka 主题的名称。

    • 对于 mskGroupName,提供您的 Kafka 使用者组的名称。

  2. 确定 Lambda 发现和连接您的 Amazon MSK 集群所需的 Amazon MSK、Amazon EC2 和 CloudWatch 权限,并记录这些事件。

    AWSLambdaMSKExecutionRole 托管策略宽松地定义所需的权限。在以下步骤中使用该策略。

    在生产环境中,评测 AWSLambdaMSKExecutionRole 以根据最低权限原则限制您的执行角色策略,然后为您的角色编写一个策略来取代此托管策略。

有关 IAM 策略语言的详细信息,请参阅 IAM 文档

现在,您已经编写了策略文档,请创建一个 IAM 策略,这样便可将其附加到您的角色中。您可以使用控制台按以下步骤完成此操作。

从策略文档创建 IAM 策略
  1. 登录 Amazon Web Services Management Console,然后使用以下网址打开 IAM 控制台:https://console.aws.amazon.com/iam/

  2. 在左侧的导航窗格中,选择策略

  3. 选择创建策略

  4. 策略编辑器部分,选择 JSON 选项。

  5. 粘贴 clusterAuthPolicy

  6. 向策略添加完权限后,选择下一步

  7. 查看和创建页面上,为创建的策略键入策略名称描述(可选)。查看此策略中定义的权限以查看策略授予的权限。

  8. 选择创建策略可保存新策略。

有关更多信息,请参阅 IAM 文档中的创建 IAM 策略

既然您已经有了适当的 IAM 策略,请创建一个角色并将其附加到该角色。您可以使用控制台按以下步骤完成此操作。

在 IAM 控制台中创建执行角色
  1. 在 IAM 控制台中,打开 Roles(角色)页面。

  2. 选择 Create role(创建角色)。

  3. 可信实体类型下,选择 Amazon 服务

  4. Use case(使用案例)下,选择 Lambda

  5. 选择下一步

  6. 选择以下策略:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. 选择下一步

  8. 对于角色名称,输入 lambdaAuthRole,然后选择创建角色

有关更多信息,请参阅 使用执行角色定义 Lambda 函数权限

创建 Lambda 函数以从您的 Amazon MSK 主题中读取

创建配置为使用您的 IAM 角色的 Lambda 函数。您可以使用控制台创建您的 Lambda 函数。

使用您的身份验证配置创建 Lambda 函数
  1. 打开 Lambda 控制台并从标题中选择创建函数

  2. 选择从头开始编写

  3. 对于函数名称,请提供您选择的相应名称。

  4. 对于运行时,选择最新支持版本的 Node.js 以使用本教程中提供的代码。

  5. 选择更改默认执行角色

  6. 选择使用现有角色

  7. 对于现有角色,选择 lambdaAuthRole

在生产环境中,您通常需要向 Lambda 函数的执行角色添加更多策略,以便有效地处理您的 Amazon MSK 事件。有关向角色添加策略的更多信息,请参阅 IAM 文档中的添加或删除身份权限

创建到 Lambda 函数的事件源映射

您的 Amazon MSK 事件源映射为 Lambda 服务提供了必要的信息,以在发生相应 Amazon MSK 事件时调用您的 Lambda。您可以使用控制台创建 Amazon MSK 映射。创建 Lambda 触发器,然后事件源映射会自动设置。

创建 Lambda 触发器(和事件源映射)
  1. 导航到您的 Lambda 函数的概述页面。

  2. 在函数概述部分中,选择左下角的添加触发器

  3. 选择源下拉列表中,选择 Amazon MSK

  4. 请勿设置身份验证

  5. 对于 MSK 集群,选择集群的名称。

  6. 对于批次大小,输入 1。此步骤使该功能更易于测试,但并非生产中的理想值。

  7. 对于主题名称,输入 Kafka 主题名称。

  8. 对于使用者组 ID,请提供您的 Kafka 使用者组的 ID。

更新您的 Lambda 函数以读取流数据

Lambda 通过事件方法参数提供有关 Kafka 事件的信息。有关 Amazon MSK 事件的示例结构,请参阅 示例事件。在您了解如何解读 Lambda 转发的 Amazon MSK 事件后,您可以修改您的 Lambda 函数代码以使用其提供的信息。

向您的 Lambda 函数提供以下代码,用于记录 Lambda Amazon MSK 事件的内容以进行测试:

Node.js
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }

您可以使用控制台向 Lambda 提供函数代码。

更新 Lambda 函数代码
  1. 导航到您的 Lambda 函数的概述页面。

  2. 选择节点选项卡。

  3. 将提供的代码输入到代码源 IDE 中。

  4. 代码源导航栏中,选择部署

测试您的 Lambda 函数以验证其是否连接到您的 Amazon MSK 主题

现在,您可以通过查看 CloudWatch 事件日志来验证事件源是否正在调用您的 Lambda。

验证是否正在调用您的 Lambda 函数
  1. 使用您的 Kafka 管理主机通过 CLI kafka-console-producer 生成 Kafka 事件。有关更多信息,请参阅 Kafka 文档中的 Write some events into the topic。发送足够的事件以填充上一步中定义的事件源映射批次大小所定义的批次,否则 Lambda 将等待更多信息来调用。

  2. 如果您的函数运行,则 Lambda 会将发生的事件写入 CloudWatch。在控制台中,导航到您的 Lambda 函数的详细信息页面。

  3. 选择 Configuration(配置)选项卡。

  4. 在侧栏中,选择监控和操作工具

  5. 日志记录配置下确定 CloudWatch 日志组。日志组应以 /aws/lambda 开头。选择日志组的链接。

  6. 在 CloudWatch 控制台中,检查日志事件,查看 Lambda 已发送到日志流的日志事件。确定是否存在包含来自 Kafka 事件消息的日志事件,如下图所示。如果有,则表示您已成功使用 Lambda 事件源映射将 Lambda 函数连接到 Amazon MSK。

    CloudWatch 中的日志事件,其中包含与提供的代码提取的事件信息相对应的消息。