将 Lambda 与 Amazon MQ 结合使用 - Amazon Lambda
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

将 Lambda 与 Amazon MQ 结合使用

Amazon MQ 是 Apache ActiveMQRabbitMQ 的托管消息代理服务。Lambda 只支持 Apache ActiveMQ。消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议,通过主题或队列事件目标进行通信。

Amazon MQ 还可以通过安装 ActiveMQ 代理以及提供不同的网络拓扑和其他基础设施需求来代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。

您可以使用 Lambda 函数处理来自 Amazon MQ 消息代理的记录。您的函数通过事件源映射触发,事件源映射是从您的代理读取消息并同步调用函数的一种 Lambda 资源。

Amazon MQ 事件源映射有以下配置限制:

  • 跨账户 – 不支持跨账户处理。您不能使用 Lambda 处理来自不同账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 – 仅支持 ActiveMQ SimpleAuthenticationPlugin。与代理关联的用户凭证是唯一的连接方法。有关身份验证的更多信息,请参阅 Amazon MQ 开发人员指南 中的 ActiveMQ 的消息收发身份验证和授权

  • 连接配额 – 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅 Amazon MQ 开发人员指南 中的配额Amazon MQ代理部分。

  • 连接 – 您可以在公共或私有 Virtual Private Cloud (VPC) 中创建代理。对于私有 VPC,您的 Lambda 函数需要具备对 VPC 的访问权限才能与记录进行交互。有关更多信息,请参阅本主题后面的 事件源映射 API

  • 事件目标 – 仅支持队列目标。但是,您可以使用虚拟主题,虚拟主题在内部与主题行为一致,在与 Lambda 交互时与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标

  • 网络拓扑 – 每个事件源映射仅支持一个单实例或备用代理。单实例代理需要一个故障转移终端节点。有关这些代理部署模式的更多信息,请参阅 Amazon MQ 开发人员指南 中的 Amazon MQ 代理架构

  • 协议 – Lambda 按照 OpenWire/Java Message Service (JMS) 协议使用消息。不支持任何其他协议。在 JMS 协议中,仅支持 TextMessageBytesMessage。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire

Lambda 自动支持 Amazon MQ 支持的最新版本的 ActiveMQ。有关受支持的最新版本,请参阅 Amazon MQ 开发人员指南 中的 Amazon MQ 发行说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用代理的代理,Lambda 将无法在该时段处理任何消息。

Lambda 使用者组

为了与 Amazon MQ 进行交互,Lambda 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与事件源映射 UUID 相同的 ID 创建使用者组。

Lambda 将提取消息,直到达到最大处理容量 (6MB)、超时或满足批处理大小。配置后,批处理大小将确定单个批次中要检索的最大项目数。您的批处理将转换为 Lambda 有效负载,并调用您的目标函数。消息既不会永久保存,也不会反序列化。相反,它们被使用者组作为字节 BLOB 进行检索,并进行 base64 编码以形成 JSON 有效负载。

注意

函数调用的最长时间为 14 分钟。

Lambda 同时处理所有传入批次,并自动扩展并发以满足需求。您可以使用 Amazon CloudWatch 中的 ConcurrentExecutions 指标监控给定函数的并发使用情况。有关并发的更多信息,请参阅 管理 Lambda 函数的并发

例 Amazon MQ 记录事件

{ "eventSource": "aws:amq", "eventSourceArn": "arn:aws-cn:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": { [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType":"jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ] } }

执行角色权限

要从 Amazon MQ 代理读取记录,您的 Lambda 函数需要向其 执行角色 添加以下权限:

注意

使用加密的客户托管密钥时,也可以添加 kms:Decrypt 权限。

将代理配置为事件源

创建事件源映射以指示 Lambda 将 Amazon MQ 代理中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个函数处理相同的数据,或使用单个函数处理来自多个源的项目。

要将您的函数配置为从 Amazon MQ 读取,请在 Lambda 控制台中创建一个 MQ 触发器。

创建触发器

  1. 打开 Lambda 控制台的“函数”页面

  2. 选择函数。

  3. Function overview (函数概览) 下,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持对 Amazon MQ 事件源使用以下选项:

  • MQ broker (MQ 代理) – 选择一个 Amazon MQ 代理。

  • Batch size (批处理大小) – 设置要在单个批次中检索的最大消息数。

  • Queue name (队列名称) – 输入要使用的 Amazon MQ 队列。

  • Source access configuration (源访问配置) – 选择存储代理凭据的 Amazon Secrets Manager 密码。

  • 启用触发器 – 禁用触发器以停止处理记录。

要启用或禁用触发器(或删除触发器),请在设计器中选择 MQ 触发器。要重新配置触发器,请使用事件源映射 API 操作。

事件源映射 API

要使用 Amazon CLIAmazon 开发工具包管理事件源,您可以使用以下 API 操作:

要使用 Amazon Command Line Interface (Amazon CLI) 创建事件源映射,请使用 create-event-source-mapping 命令。

默认情况下,在 PubliclyAccessible 标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible 设置为 true 时,才会为代理提供公有 IP 地址。

要获得对事件源映射的完全访问权限,您的代理必须使用公有终端节点或提供对 VPC 的访问权限。要满足 Amazon VPC 访问要求,您可以执行以下操作之一:

您配置的 Amazon Virtual Private Cloud (Amazon VPC) 安全组规则至少应具有以下设置:

  • 入站规则 – 对于没有公共可访问性的代理,允许指定为源的安全组的所有端口上的所有流量。对于具有公共可访问性的代理,允许所有目标的所有端口上的所有流量。

  • 出站规则 – 允许所有目标的所有端口上的所有流量传输。

Amazon VPC 配置可通过 Amazon MQAPI 发现,无需在 create-event-source-mapping 设置中配置。

以下示例 Amazon CLI 命令将创建一个事件源,将名为 MQ-Example-Function 的 Lambda 函数映射到名为 ExampleMQBroker 的 Amazon MQ 代理。该命令还提供了一个名为 ExampleMQBrokerUserPassword 的 Secrets Manager 密钥,用于存储代理凭据。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws-cn:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca \ --function-name MQ-Example-Function \ --source-access-configuration Type=BASIC_AUTH,URI=arn:aws-cn:secretsmanager:us-east-1:12345678901:secret:ExampleMQBrokerUserPassword-xPBMTt \ --queues ExampleQueue

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws-cn:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws-cn:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws-cn:secretsmanager:us-east-1:12345678901:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

使用 update-event-source-mapping 命令可以配置其他选项,例如如何处理批次,以及指定何时丢弃无法处理的记录。以下示例命令将事件源映射更新为批处理大小为 2。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws-cn:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws-cn:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

更新的设置是异步应用的,并且直到该过程完成才反映在输出中。要查看资源的当前状态,请使用 get-event-source-mapping 命令。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

您应看到以下输出:

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws-cn:mq:us-east-1:12345678901:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws-cn:lambda:us-east-1:12345678901:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

事件源映射错误

当 Lambda 函数遇到不可恢复的错误时,您的 Amazon MQ 使用者将停止处理记录。任何其他使用者如果没有遇到相同的错误,都可以继续处理。要确定使用者停止的潜在原因,请检查 EventSourceMapping 返回的详细信息中的 StateTransitionReason 字段中是否有以下代码:

ESM_CONFIG_NOT_VALID

事件源映射配置无效。

EVENT_SOURCE_AUTHN_ERROR

Lambda 未能验证事件源。

EVENT_SOURCE_AUTHZ_ERROR

Lambda 没有访问事件源所需的权限。

FUNCTION_CONFIG_NOT_VALID

函数的配置无效。

如果记录由于其大小而被丢弃,也将处于未处理状态。Lambda 记录的大小限制为 6 MB。要在函数出错时重新传递消息,您可以使用重新传递策略和死信队列 (DLQ) 处理 Amazon MQ。有关更多信息,请参阅 Apache ActiveMQ 网站上的消息重新传递和 DLQ 处理