结合 Amazon MSK 使用 Lambda - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

结合 Amazon MSK 使用 Lambda

Amazon Managed Streaming for Apache Kafka (Amazon MSK) 是一项完全托管式服务,可用于构建并运行使用 Apache Kafka 来处理流数据的应用程序。Amazon MSK 简化了运行 Kafka 的集群的设置、扩展和管理。Amazon MSK 还可以更轻松地配置您的应用程序以适用于多个可用区和保证 Amazon Identity and Access Management (IAM) 的安全性。Amazon MSK 支持多个开源版本的 Kafka。

Amazon MSK 作为事件源,运行方式与使用 Amazon Simple Queue Service (Amazon SQS) 或 Amazon Kinesis 相似。Lambda 在内部轮询来自事件源的新消息,然后同步调用目标 Lambda 函数。Lambda 批量读取消息,并将这些消息作为事件有效负载提供给您的函数。最大批处理大小可配置。(默认值为 100 个消息。)

有关如何将 Amazon MSK 配置为事件源的示例,请参阅Amazon计算博客上的将 Amazon MSK 用作 Amazon Lambda 事件源。要查看完整的教程,请访问 Amazon MSK Labs 中的 Amazon MSK Lambda 集成

对于基于 Kafka 的事件源,Lambda 支持处理控制参数,例如批处理时段和批处理大小。有关更多信息,请参阅 批处理行为

Lambda 按顺序读取各个分区的消息。Lambda 处理各个批次后,会提交该批次中消息的偏移量。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。

Lambda 最多可以运行函数 14 分钟。请将函数超时配置为 14 分钟或以下(原定设置超时值为 3 秒)。Lambda 可能会重试超过 14 分钟的调用。

Lambda 调用函数时会在事件参数中发送一批消息。事件负载包含一个消息数组。每个数组项目都包含 Amazon MSK 主题和分区标识符的详细信息,以及时间戳和 base64 编码的消息。

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":"0", "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

有关如何将 Amazon MSK 配置为事件源的示例,请参阅Amazon计算博客上的将 Amazon MSK 用作 Amazon Lambda 事件源。要查看完整的教程,请访问 Amazon MSK Labs 中的 Amazon MSK Lambda 集成

MSK 集群身份验证

Lambda 需要访问 Amazon MSK 集群、检索记录和执行其他任务的权限。Amazon MSK 支持通过多种选项来控制客户端对 MSK 集群的访问。

未经身份验证的访问

如果没有客户端会通过互联网访问集群,则可以使用未经身份验证访问。

SASL/SCRAM 身份验证

Amazon MSK 支持使用传输层安全性(TLS)加密进行简单身份验证和安全层/加盐质疑应答身份验证机制(SASL/SCRAM)身份验证。为使 Lambda 连接到集群,您可以将身份验证凭证(用户名和密码)存储在 Amazon Secrets Manager 密钥中。

有关使用 Secrets Manager 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的使用 Amazon Secrets Manager 进行用户名和密码身份验证

Amazon MSK 不支持 SASL/PLAIN 身份验证。

基于 IAM 角色的身份验证

您可以使用 IAM 来验证连接到 MSK 集群的客户端的身份。要创建和部署基于 IAM 用户或角色的策略,请使用 IAM 控制台或 API。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 IAM 访问控制

要允许 Lambda 连接到 MSK 集群、读取记录和执行其他所需操作,请将以下权限添加到函数的执行角色

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/group-name" ] } ] }

您可以将这些权限范围限定为特定集群、主题和组。有关更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 Amazon MSK Kafka 操作。IAM 使用的组名称等同于事件源映射的 UUID。

双向 TLS 身份验证

双向 TLS(mTLS)在客户端和服务器之间提供双向身份验证。客户端向服务器发送证书以便服务器验证客户端,而服务器又向客户端发送证书以便客户端验证服务器。

对于 Amazon MSK,Lambda 充当客户端。您可以配置客户端证书(作为 Secrets Manager 中的密钥),以使用 MSK 集群中的代理对 Lambda 进行身份验证。客户端证书必须由服务器信任存储中的 CA 签名。MSK 集群会向 Lambda 发送服务器证书,以便使用 Lambda 对代理进行身份验证。服务器证书必须由 Amazon 信任存储中的证书颁发机构(CA)签名。

有关如何生成客户端证书的说明,请参阅为作为事件源的 Amazon MSK 引入双向 TLS 身份验证

Amazon MSK 不支持自签名服务器证书,因为 Amazon MSK 中的所有代理都使用由 Amazon Trust Services CA 签名的公有证书,预设情况下 Lambda 信任这些证书。

有关适用于 Amazon MSK 的 mTLS 的更多信息,请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的双向 TLS 身份验证

配置 mTLS 密钥

CLIENT_CERTIFICATE_TLS_AUTH 密钥需要证书字段和私有密钥字段。对于加密的私有密钥,密钥需要私有密钥密码。证书和私有密钥必须采用 PEM 格式。

注意

Lambda 支持 PBES1(而不是 PBES2)私有密钥加密算法。

证书字段必须包含证书列表,首先是客户端证书,然后是任何中间证书,最后是根证书。每个证书都必须按照以下结构在新行中启动:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager 支持最多包含 65536 字节的密钥,这为长证书链提供了充足的空间。

私有密钥必须采用 PKCS #8 格式,并具有以下结构:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

对于加密的私有密钥,请使用以下结构:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下示例显示使用加密私有密钥进行 MTL 身份验证的密钥内容。对于加密的私有密钥,您可以在密钥中包含私有密钥密码。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

管理 API 访问和权限

除了访问 Amazon MSK 集群外,您的函数还需要具有执行各种 Amazon MSK API 操作的权限。您可以为函数的执行角色添加这些权限。如果您的用户需要访问任何 Amazon MSK API 操作,请将所需权限添加到 IAM 用户或角色的身份策略中。

需要的 Lambda 函数执行角色权限

Lambda 函数的执行角色必须具有以下权限才能代表您访问 MSK 集群。您可以将 Amazon 托管策略 AWSLambdaMSKExecutionRole 添加到执行角色,也可以创建自定义策略并添加执行以下操作的权限:

向执行角色添加权限

请按照以下步骤使用 IAM 控制台将Amazon托管策略 AWSLambdaMSKExecutionRole 添加到执行角色。

添加 Amazon 托管策略

  1. 打开 IAM 控制台的 Policies(策略)页面

  2. 在搜索框中,输入策略名称 (AWSLambdaMSKExecutionRole)。

  3. 从列表中选择策略,然后依次选择 Policy actions(策略操作)、Attach(附加)。

  4. Attach policy(附加策略)页面,从列表中选择您的执行角色,然后选择 Attach policy(附加策略)。

使用 IAM 策略授予用户访问权限

预设情况下,IAM 用户和角色无权执行 Amazon MSK API 操作。要向组织或账户中的用户授予访问权限,您可以添加或更新基于身份的策略。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 基于身份的策略示例

使用 SASL/SCRAM 身份验证

Amazon MSK 支持使用 TLS 加密进行 Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 身份验证。您可以使用 Amazon Secrets Manager 密钥来设置用户名和密码身份验证,从而实现对 Amazon MSK 集群访问权限的控制。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的使用 Amazon Secrets Manager 进行用户名和密码身份验证

请注意,Amazon MSK 不支持 SASL/PLAIN 身份验证。

身份验证和授权错误

如果缺少使用来自 Amazon MSK 集群的数据所需的任何权限,Lambda 会在 LastProcessingResult 下的事件源映射中显示以下错误消息。

集群未能授权 Lambda

对于 SASL/SCRAM 或 mTLS,此错误表明提供的用户不具有以下所有必需的 Kafka 访问控制列表(ACL)权限:

  • DescribeConfigs 集群

  • 描述组

  • 读取组

  • 描述主题

  • 读取主题

对于 IAM 访问控制,此错误表明函数的执行角色缺少访问组或主题所需的一个或多个权限。查看 基于 IAM 角色的身份验证 中的所需权限列表。

当您使用所需的 Kafka 集群权限创建 Kafka ACL 或 IAM 策略时,请将主题和组指定为资源。主题名称必须与事件源映射中的主题一致。组名称必须与事件源映射的 UUID 一致。

向执行角色添加所需的权限后,更改可能需要几分钟才会生效。

SASL 身份验证失败

对于 SASL/SCRAM,此错误表明提供的用户名和密码无效。

对于 IAM 访问控制,此错误表明执行角色缺少 MSK 集群的 kafka-cluster:Connect 权限。将此权限添加到该角色并将集群的 Amazon Resource Name(ARN)指定为资源。

您可能会看到此错误间歇性发生。在 TCP 连接数超过 Amazon MSK 服务配额后,集群将拒绝连接。Lambda 会退回并重试,直到连接成功为止。在 Lambda 连接到集群并轮询记录后,最后的处理结果将更改为 OK

服务器未能通过 Lambda 的身份验证

此错误表明 Amazon MSK Kafka 未能通过 Lambda 的身份验证。出现此错误的可能原因如下:

  • 您没有为 mTLS 身份验证提供客户端证书。

  • 您提供了客户端证书,但未将代理配置为使用 mTLS。

  • 代理不信任客户端证书。

提供的证书或私有密钥无效

此错误表明 Amazon MSK 使用者无法使用提供的证书或私有密钥。确保证书和密钥使用 PEM 格式,并且私有密钥加密使用 PBES1 算法。

网络配置

Lambda 必须具有对与 Amazon MSK 集群关联的 Amazon Virtual Private Cloud (Amazon VPC) 资源的访问权限。建议您为 Lambda 和 Amazon Security Token Service (Amazon STS) 部署 Amazon PrivateLink VPC 终端节点。如果需要身份验证,还应为 Secrets Manager 部署 VPC 终端节点。

或者,请确保与 MSK 集群关联的 VPC 在每个公有子网中包含一个 NAT 网关。有关更多信息,请参阅 VPC 连接函数的 Internet 和服务访问

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则 – 允许为事件源指定之安全组的 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。对于为事件源指定的安全组,允许 Amazon MSK 代理端口(9092 对应纯文本,9094 对应 TLS,9096 对应 SASL,9098 对应 IAM)上的所有流量。

  • 如果您使用的是 VPC 终端节点而不是 NAT 网关,则与 VPC 终端节点关联的安全组必须允许来自事件源安全组的端口 443 上的所有入站流量。

注意

可通过 Amazon MSK API 发现您的 Amazon VPC 配置。在设置过程中,您不需要使用 create-event-source-mapping 命令对其进行配置。

有关配置网络的更多信息,请参阅Amazon计算博客上的使用 Apache Kafka 集群在 VPC 中设置 Amazon Lambda

将 Amazon MSK 添加为事件源

创建事件源映射,使用 Lambda 控制台、Amazon开发工具包,或 Amazon Command Line Interface (Amazon CLI) 将您的 Amazon MSK 添加为 Lambda 函数触发器

本节介绍了如何使用 Lambda 控制台和 Amazon CLI 创建事件源映射。

先决条件

  • 一个 Amazon MSK 集群和一个 Kafka 主题。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的开始使用 Amazon MSK

  • 一个有权访问 MSK 集群所用 Amazon 资源的执行角色

添加 Amazon MSK 触发器(控制台)

按照以下步骤将 Amazon MSK 集群和 Kafka 主题添加为 Lambda 函数的触发器。

将 Amazon MSK 触发器添加到 Lambda 函数(控制台)

  1. 打开 Lamba 控制台的 Functions(函数)页面。

  2. 选择 Lambda 函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. Trigger configuration(触发配置)下,执行以下操作:

    1. 选择 MSK 触发器类型。

    2. 对于 MSK cluster(MSK 集群),选择您的集群。

    3. 对于 Batch size(批处理大小),输入要在单个批次中接收的最大消息数。

    4. 对于 Topic name(主题名称),输入主题名称。

    5. (可选)对于 Starting position(起始位置),选择 Latest(最新)以开始从最新记录读取流。或者选择 Trim horizon(时间范围)以从最早的可用记录开始。

    6. (可选)对于 Authentication(身份验证),选择用于通过 MSK 集群中的代理进行身份验证的密钥。

    7. 要在禁用状态下创建触发器以进行测试(推荐),请清除 Enable trigger(启用触发器)。或者,要立即启用该触发器,请选择 Enable trigger(启用触发器)。

  5. 要创建触发器,请选择 Add(添加)。

添加 Amazon MSK 触发器 (Amazon CLI)

使用以下示例 Amazon CLI 命令为 Lambda 函数创建和查看 Amazon MSK 触发器。

使用 Amazon CLI 创建触发器

以下示例使用 create-event-source-mapping Amazon CLI 命令将一个名为 my-kafka-function 的 Lambda 函数映射至一个名为 AWSKafkaTopic 的 Kafka 主题。将主题的起始位置设置为 LATEST

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-west-2:arn:aws:kafka:us-west-2:111111111111:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function

有关更多信息,请参阅 CreateEventSourceMapping API 参考文档。

使用 Amazon CLI 查看状态

以下示例使用 get-event-source-mapping Amazon CLI 命令来描述您创建的事件源映射的状态。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK 事件源的自动伸缩

当您最初创建 Amazon MSK 事件源时,Lambda 会分配一个使用者来处理 Kafka 主题中的所有分区。每个使用者都使用多个并行运行的处理器来处理增加的工作负载。此外,Lambda 会根据工作负载自动增加或缩减使用者的数量。要保留每个分区中的消息顺序,使用者的最大数量为主题中每个分区一个使用者。

Lambda 会按一分钟的间隔时间来评估主题中所有分区的使用者偏移滞后。如果延迟太高,则分区接收消息的速度比 Lambda 处理消息的速度更快。如有必要,Lambda 会在主题中添加或删除使用者。增加或移除使用者的扩缩过程会在评估完成后的三分钟内进行。

如果您的目标 Lambda 函数过载,Lambda 会减少使用者的数量。此操作通过减少使用者可以检索和发送到函数的消息数来减少函数的工作负载。

要监控 Kafka 主题的吞吐量,请查看 Lambda 在您的函数处理记录时发出的偏移滞后指标

要检查并行发生的函数调用次数,还可以监控函数的并发指标

Amazon CloudWatch 指标

Lambda 会在您的函数处理记录时发出 OffsetLag 指标。此指标值是写入 Kafka 事件源主题的最后一条记录与 Lambda 处理的最后一条记录之间的偏移量差值。您可以使用 OffsetLag 来估计添加记录和函数处理记录之间的延迟。

如果 OffsetLag 呈上升趋势,则可能说明您的函数存在问题。有关更多信息,请参阅 使用 Lambda 函数指标

Amazon MSK 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于Amazon MSK。

适用于 Amazon MSK 的事件源参数
参数 必需 默认值 备注

BatchSize

100

最大值:10000

已启用

已启用

EventSourceArn

只能在 Create(创建)设置

FunctionName

SourceAccessConfigurations

无凭证

您的事件源的 VPC 信息或 SASL/SCRM 身份验证凭证

StartingPosition

TRIM_HORIZON 或 LATEST(最新)

只能在 Create(创建)设置

主题

Kafka 主题名称

只能在 Create(创建)设置