结合 Amazon MQ 使用 Lambda - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

结合 Amazon MQ 使用 Lambda

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议,通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。Lambda 通过事件源映射调用您的函数,事件源映射是从您的代理读取消息并同步调用函数的一种 Lambda 资源。

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 Amazon 知识中心的如何使我的 Lambda 函数具有幂等性

Amazon MQ 事件源映射有以下配置限制:

  • 并发 – 使用 Amazon MQ 事件源映射的 Lambda 函数具有默认的最大并发设置。对于 ActiveMQ,Lambda 服务将并发执行环境的数量限制为 5。对于 RabbitMQ,将并发执行环境的数量限制为 1。即使您更改了函数的预留或预调配并发设置,Lambda 服务也不会提供更多的执行环境。要请求增加默认的最大并发数,请联系 Amazon Web Services Support。

  • 跨账户 – Lambda 不支持跨账户处理。您不能使用 Lambda 处理来自不同 Amazon Web Services 账户 账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 – 对于 ActiveMQ,仅支持 ActiveMQ SimpleAuthenticationPlugin。对于 RabbitMQ,仅支持 PLAIN 身份验证机制。用户必须使用 Amazon Secrets Manager 来管理凭据。有关 ActiveMQ 身份验证的更多信息,请参阅 Amazon MQ 开发人员指南中的使用 LDAP 集成 ActiveMQ 代理

  • 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 中的配额代理部分。

  • 连接 – 您可以在公有或私有虚拟私有云(VPC)中创建代理。对于私有 VPC,您的 Lambda 函数需要具备对 VPC 的访问权限才能接收消息。有关更多信息,请参阅本主题后面的 网络配置

  • 事件目标 – 仅支持队列目标。但是,您可以使用虚拟主题,虚拟主题在内部与主题行为一致,在与 Lambda 交互时与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 – 对于 ActiveMQ,每个事件源映射仅支持一个单实例或备用代理。对于 RabbitMQ,每个事件源映射只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅 Amazon MQ 开发人员指南中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 – 支持的协议取决于 Amazon MQ 集成的类型。

    • 对于 ActiveMQ 集成,Lambda 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。消息的使用不支持任何其他协议。在 JMS 协议中,仅支持 TextMessageBytesMessage。Lambda 还支持 JMS 自定义属性。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire

    • 对于 RabbitMQ 集成,Lambda 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息,请参阅 RabbitMQ 网站上的 AMQP 0-9-1 完整参考指南

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理,Lambda 将无法在该时段处理任何消息。

Lambda 使用者组

为了与 Amazon MQ 进行交互,Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 事件源,Lambda 会将记录合并为批处理,然后通过单个有效负载中将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。Lambda 会持续提取消息,直到达到 6 MB 的最大有效负载大小、批处理时段过期或记录数达到完整批处理大小时为止。有关更多信息,请参阅 批处理行为

使用者组将消息作为字节 BLOB 进行检索,然后以 base64 格式编码为单个 JSON 有效负载,接下来调用您的函数。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。

注意

尽管 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon DocumentDB、Amazon MQ for ActiveMQ 和 RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

您可以使用 Amazon CloudWatch 中的 ConcurrentExecutions 指标监控给定函数的并发使用情况。有关并发的更多信息,请参阅 配置预留并发

例 Amazon MQ 记录事件
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注意

在 RabbitMQ 示例中,pizzaQueue 是 RabbitMQ 队列的名称,/ 是虚拟主机的名称。接收消息时,事件源会将消息列在 pizzaQueue::/ 下。

执行角色权限

要从 Amazon MQ 代理读取记录,您的 Lambda 函数需要为其执行角色添加以下权限:

注意

使用加密的客户托管密钥时,也可以添加 kms:Decrypt 权限。

网络配置

要通过事件源映射向 Lambda 提供对代理的完全访问权限,代理必须使用公有端点(公有 IP 地址),或者您必须提供对您在其中创建代理的 Amazon VPC 的访问权限。

默认情况下,创建 Amazon MQ 代理时,PubliclyAccessible 标志设置为 false。要让代理接收公有 IP 地址,必须将 PubliclyAccessible 标志设置为 true。

将 Amazon MQ 与 Lambda 结合使用的最佳做法是,使用 Amazon PrivateLink VPC 端点并向 Lambda 函数授予访问代理的 VPC 的权限。为 Lambda 部署一个端点,并为 Amazon Security Token Service(Amazon STS)部署一个端点(仅适用于 ActiveMQ)。如果代理使用身份验证,则还需要为 Amazon Secrets Manager 部署端点。要了解更多信息,请参阅 使用 VPC 端点

也可以在包含 Amazon MQ 代理的 VPC 中的每个公有子网上配置 NAT 网关。有关更多信息,请参阅 为连接到 VPC 的 Lambda 函数启用互联网访问权限

在为 Amazon MQ 代理创建事件源映射时,Lambda 会检查代理 VPC 的子网和安全组是否已存在弹性网络接口(ENI)。如果 Lambda 发现现有 ENI,则会尝试重用这些 ENI。否则,Lambda 会创建新的 ENI 来连接到事件源并调用函数。

注意

Lambda 函数始终在 Lambda 服务拥有的 Amazon VPC 中运行。这些 VPC 由服务自动维护,对客户不可见。您也可以将函数连接到 Amazon VPC。无论是哪种情况,函数的 VPC 配置都不会影响事件源映射。只有事件源 VPC 的配置才能决定 Lambda 连接到事件源的方式。

VPC 安全组规则

使用以下规则配置包含集群的 Amazon VPC 安全组(最低要求):

  • 入站规则:允许来自事件源安全组的代理端口上为事件源指定的安全组的所有流量。ActiveMQ 默认使用端口 61617,而 RabbitMQ 默认使用端口 5671。

  • 出站规则 – 允许所有目标的端口 443 上的所有流量传输。允许来自事件源安全组的代理端口上的所有流量。ActiveMQ 默认使用端口 61617,而 RabbitMQ 默认使用端口 5671。

  • 如果您使用的是 VPC 端点而不是 NAT 网关,则与 VPC 端点关联的安全组必须允许来自事件源安全组的端口 443 上的所有入站流量。

使用 VPC 端点

在使用 VPC 端点时,调用函数的 API 调用会使用 ENI 通过这些端点进行路由。Lambda 服务主体需要针对使用这些 ENI 的任何函数调用 lambda:InvokeFunction。此外,对于 ActiveMQ,Lambda 服务主体需要针对使用 ENI 的角色调用 sts:AssumeRole

默认情况下,VPC 端点的 IAM 策略处于开放状态。最佳做法是,将这些策略限制为仅允许特定主体使用该端点执行所需操作。为确保事件源映射能够调用 Lambda 函数,VPC 端点策略必须允许 Lambda 服务主体调用 lambda:InvokeFunction,对于 ActiveMQ,则必须允许调用 sts:AssumeRole。将 VPC 端点策略限制为仅允许来自组织内部的 API 调用,会导致事件源映射无法正常运行。

以下 VPC 端点策略示例展示了如何授予 Amazon STS 和 Lambda 端点所需的访问权限。

例 VPC 端点策略 – Amazon STS 端点(仅限 ActiveMQ)
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
例 VPC 端点策略 – Lambda 端点
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

如果 Amazon MQ 代理使用身份验证,您还可以限制 Secrets Manager 端点的 VPC 端点策略。要调用 Secrets Manager API,Lambda 会使用函数角色而非 Lambda 服务主体。以下示例展示了 Secrets Manager 端点策略。

例 VPC 端点策略 – Secrets Manager 端点
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

添加权限并创建事件源映射

创建事件源映射以指示 Lambda 将 Amazon MQ 代理中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个函数处理相同的数据,或使用单个函数处理来自多个源的项目。

要将您的函数配置为从 Amazon MQ 中读取,请添加所需权限并在 Lambda 控制台中创建 MQ 触发器。

要添加权限并创建触发器
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. 选择 Configuration(配置)选项卡,然后选择 Permissions(权限)。

  4. 角色名称下,选择至执行角色的链接。此角色将在 IAM 控制台中打开角色。

    至执行角色的链接
  5. 选择添加权限,然后选择创建内联策略

    在 IAM 控制台中创建内联策略
  6. 策略编辑器中,选择 JSON。输入以下策略。您的函数需要这些权限才能从 Amazon MQ 代理读取数据。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "mq:DescribeBroker", "secretsmanager:GetSecretValue", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
    注意

    使用加密的客户托管密钥时,还必须添加 kms:Decrypt 权限。

  7. 选择下一步。输入策略名称,然后选择创建策略

  8. 在 Lambda 控制台中返回您的函数。在 Function overview(函数概览)下,选择 Add trigger(添加触发器)。

    Lambda 控制台的函数概述部分
  9. 选择 MQ 触发器类型。

  10. 配置必填选项,然后选择 Add(添加)。

Lambda 支持对 Amazon MQ 事件源使用以下选项:

  • MQ broker(MQ 代理)– 选择 Amazon MQ 代理。

  • Batch size(批处理大小)– 设置要在单个批次中检索的最大消息数。

  • Queue name(队列名称)– 输入要使用的 Amazon MQ 队列。

  • Source access configuration(源访问配置)– 输入虚拟主机信息和 Secret Secrets Manager 密钥,用于存储您的代理凭证。

  • Enable trigger(启用触发器)– 禁用触发器以停止处理记录。

要启用或禁用触发器(或删除触发器),请在设计器中选择 MQ 触发器。要重新配置触发器,请使用事件源映射 API 操作。

事件源映射 API

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

要使用 Amazon Command Line Interface (Amazon CLI) 创建事件源映射,请使用 create-event-source-mapping 命令。

以下示例 Amazon CLI 命令将创建一个事件源,将名为 MQ-Example-Function 的 Lambda 函数映射到名为 ExampleMQBroker 的基于 RabbitMQ 的 Amazon MQ 代理。该命令还提供了一个虚拟主机名称和 Secrets Manager 密钥 ARN,用于存储代理凭证。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98 \ --function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function \ --queues ExampleQueue \ --source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt \

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

使用 update-event-source-mapping 命令可以配置其他选项,例如 Lambda 如何处理批次,以及指定何时丢弃无法处理的记录。以下示例命令将事件源映射更新为批处理大小为 2。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda 异步更新这些设置。此过程完成后,输出才会反映更改。要查看资源的当前状态,请使用 get-event-source-mapping 命令。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

事件源映射错误

当 Lambda 函数遇到不可恢复的错误时,您的 Amazon MQ 使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误,都可以继续处理。要确定使用者停止的潜在原因,请检查 StateTransitionReason 返回的详细信息中的 EventSourceMapping 字段中是否有以下代码:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 验证事件源失败。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

函数的配置无效。

如果记录由于其大小而被 Lambda 丢弃,也将处于未处理状态。Lambda 记录的大小限制为 6 MB。要在函数出错时重新传递消息,您可以使用死信队列 (DLQ)。有关更多信息,请参阅 Apache ActiveMQ 网站上的消息重新传递和 DLQ 处理和 RabbitMQ 网站上的可靠性指南

注意

Lambda 不支持自定义重新传递策略。相反,Lambda 使用一个策略,其默认值来自 Apache ActiveMQ 网站上的重新传递策略页面。其中,maximumRedeliveries 设置为 6。

Amazon MQ 和 RabbitMQ 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于 Amazon MQ 和 RabbitMQ。

适用于 Amazon MQ 和 RabbitMQ 的事件源参数
参数 必需 默认值 备注

BatchSize

100

最大值:10000

已启用

真实

FunctionName

FilterCriteria

Lambda 事件筛选

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

队列

要使用的 Amazon MQ 代理目标队列的名称。

SourceAccessConfigurations

对于 ActiveMQ 为 BASIC_AUTH 凭证。对于 RabbitMQ,可以同时包含 BASIC_AUTH 凭证和 VIRTUAL_HOST 信息。