结合 Amazon MQ 使用 Lambda - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

结合 Amazon MQ 使用 Lambda

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议,通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。Lambda 通过事件源映射调用您的函数,事件源映射是从您的代理读取消息并同步调用函数的一种 Lambda 资源。

Amazon MQ 事件源映射有以下配置限制:

  • 跨账户 – Lambda 不支持跨账户处理。您不能使用 Lambda 处理来自不同Amazon账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 – 对于 ActiveMQ,仅支持 ActiveMQ SimpleAuthenticationPlugin。对于 RabbitMQ,仅支持 PLAIN 身份验证机制。用户必须使用 Amazon Secrets Manager 来管理凭据。有关 ActiveMQ 身份验证的更多信息,请参阅 Amazon MQ 开发人员指南中的使用 LDAP 集成 ActiveMQ 代理

  • 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 中的配额代理部分。

  • 连接 – 您可以在公有或私有虚拟私有云(VPC)中创建代理。对于私有 VPC,您的 Lambda 函数需要具备对 VPC 的访问权限才能接收消息。有关更多信息,请参阅本主题后面的 事件源映射 API

  • 事件目标 – 仅支持队列目标。但是,您可以使用虚拟主题,虚拟主题在内部与主题行为一致,在与 Lambda 交互时与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 – 对于 ActiveMQ,每个事件源映射仅支持一个单实例或备用代理。对于 RabbitMQ,每个事件源映射只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅 Amazon MQ 开发人员指南中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 – 支持的协议取决于 Amazon MQ 集成的类型。

    • 对于 ActiveMQ 集成,Lambda 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。消息的使用不支持任何其他协议。在 JMS 协议中,仅支持 TextMessageBytesMessage。Lambda 还支持 JMS 自定义属性。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire

    • 对于 RabbitMQ 集成,Lambda 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息,请参阅 RabbitMQ 网站上的 AMQP 0-9-1 完整参考指南

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅 Amazon MQ 开发人员指南中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理,Lambda 将无法在该时段处理任何消息。

Lambda 使用者组

为了与 Amazon MQ 进行交互,Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 事件源,Lambda 会将记录合并为批处理,然后通过单个有效负载中将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。Lambda 会持续提取消息,直到达到 6 MB 的最大有效负载大小、批处理时段过期或记录数达到完整批处理大小时为止。有关更多信息,请参阅批处理行为

使用者组将消息作为字节 BLOB 进行检索,然后以 base64 格式编码为单个 JSON 有效负载,接下来调用您的函数。如果函数为批处理中的任何消息返回错误,Lambda 将重试整批消息,直到处理成功或消息过期为止。

注意

虽然 Lambda 函数的最大超时限制通常为 15 分钟,但 Amazon MSK、自行管理的 Apache Kafka、Amazon MQ for ActiveMQ 和 Amazon MQ for RabbitMQ 的事件源映射,仅支持最大超时限制为 14 分钟的函数。此约束可确保事件源映射可以正确处理函数错误和重试。

您可以使用 Amazon CloudWatch 中的 ConcurrentExecutions 指标监控给定函数的并发使用情况。有关并发的更多信息,请参阅 配置预留并发

例 Amazon MQ 记录事件
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注意

在 RabbitMQ 示例中,pizzaQueue 是 RabbitMQ 队列的名称,/ 是虚拟主机的名称。接收消息时,事件源会将消息列在 pizzaQueue::/ 下。

执行角色权限

要从 Amazon MQ 代理读取记录,您的 Lambda 函数需要为其执行角色添加以下权限:

注意

使用加密的客户托管密钥时,也可以添加 kms:Decrypt 权限。

将代理配置为事件源

创建事件源映射以指示 Lambda 将 Amazon MQ 代理中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个函数处理相同的数据,或使用单个函数处理来自多个源的项目。

要将您的函数配置为从 Amazon MQ 中读取,请在 Lambda 控制台中创建 MQ 触发器。

创建触发器
  1. 打开 Lamba 控制台的函数页面

  2. 选择一个函数的名称。

  3. Function overview(函数概览)下,选择 Add trigger(添加触发器)。

  4. 选择触发器类型。

  5. 配置必填选项,然后选择 Add(添加)。

Lambda 支持对 Amazon MQ 事件源使用以下选项:

  • MQ broker(MQ 代理)– 选择 Amazon MQ 代理。

  • Batch size(批处理大小)– 设置要在单个批次中检索的最大消息数。

  • Queue name(队列名称)– 输入要使用的 Amazon MQ 队列。

  • Source access configuration(源访问配置)– 输入虚拟主机信息和 Secret Secrets Manager 密钥,用于存储您的代理凭证。

  • Enable trigger(启用触发器)– 禁用触发器以停止处理记录。

要启用或禁用触发器(或删除触发器),请在设计器中选择 MQ 触发器。要重新配置触发器,请使用事件源映射 API 操作。

事件源映射 API

要使用 Amazon Command Line Interface(Amazon CLI)Amazon SDK 来管理事件源,可以使用以下 API 操作:

注意

在更新、禁用或删除 Amazon MQ 事件源映射后,更改最长可能需要 15 分钟才会生效。在此期间结束之前,您的事件源映射可能会继续处理事件并使用您之前的设置来调用您的函数。即使控制台中显示的事件源映射状态表明更改已经生效,也可能发生这种情况。

要使用 Amazon Command Line Interface (Amazon CLI) 创建事件源映射,请使用 create-event-source-mapping 命令。

默认情况下,在 PubliclyAccessible 标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible 设置为 true 时,代理才会接收公有 IP 地址。

要获得对事件源映射的完全访问权限,您的代理必须使用公有终端节点或提供对 VPC 的访问权限。请注意,当您将 Amazon MQ 添加为触发器时,Lambda 将采用 Amazon MQ 代理的 VPC 设置,而不是 Lambda 函数的 VPC 设置。要满足 Amazon Virtual Private Cloud (Amazon VPC) 访问要求,您可以执行以下操作之一:

您配置的 Amazon VPC 安全组规则至少应具有以下设置:

  • 入站规则 – 对于没有公共可访问性的代理,允许指定为源的安全组的所有端口上的所有流量。对于具有公共可访问性的代理,允许所有目标的所有端口上的所有流量。

  • 出站规则 – 允许所有目标的所有端口上的所有流量传输。

Amazon VPC 配置可通过 Amazon MQ API 发现,无需在 create-event-source-mapping 设置中配置。

以下示例 Amazon CLI 命令将创建一个事件源,将名为 MQ-Example-Function 的 Lambda 函数映射到名为 ExampleMQBroker 的基于 RabbitMQ 的 Amazon MQ 代理。该命令还提供了一个虚拟主机名称和 Secrets Manager 密钥 ARN,用于存储代理凭证。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98 \ --function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function \ --queues ExampleQueue \ --source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt \

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

使用 update-event-source-mapping 命令可以配置其他选项,例如 Lambda 如何处理批次,以及指定何时丢弃无法处理的记录。以下示例命令将事件源映射更新为批处理大小为 2。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda 异步更新这些设置。此过程完成后,输出才会反映更改。要查看资源的当前状态,请使用 get-event-source-mapping 命令。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

事件源映射错误

当 Lambda 函数遇到不可恢复的错误时,您的 Amazon MQ 使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误,都可以继续处理。要确定使用者停止的潜在原因,请检查 StateTransitionReason 返回的详细信息中的 EventSourceMapping 字段中是否有以下代码:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 验证事件源失败。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

函数的配置无效。

如果记录由于其大小而被 Lambda 丢弃,也将处于未处理状态。Lambda 记录的大小限制为 6 MB。要在函数出错时重新传递消息,您可以使用死信队列 (DLQ)。有关更多信息,请参阅 Apache ActiveMQ 网站上的消息重新传递和 DLQ 处理和 RabbitMQ 网站上的可靠性指南

注意

Lambda 不支持自定义重新传递策略。相反,Lambda 使用一个策略,其默认值来自 Apache ActiveMQ 网站上的 Redelivery Policy(重新传递策略)页面。其中,maximumRedeliveries 设置为 5。

Amazon MQ 和 RabbitMQ 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于 Amazon MQ 和 RabbitMQ。

适用于 Amazon MQ 和 RabbitMQ 的事件源参数
参数 必需 默认值 备注

BatchSize

100

最大值:10000

已启用

真实

FunctionName

FilterCriteria

Lambda 事件筛选

MaximumBatchingWindowInSeconds

500 毫秒

批处理行为

Queues

要使用的 Amazon MQ 代理目标队列的名称。

SourceAccessConfigurations

对于 ActiveMQ 为 BASIC_AUTH 凭证。对于 RabbitMQ,可以同时包含 BASIC_AUTH 凭证和 VIRTUAL_HOST 信息。