将 Amazon MQ 消息代理作为源 - Amazon EventBridge
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Amazon MQ 消息代理作为源

您可使用 EventBridge Pipes 接收来自 Amazon MQ 消息代理的记录。然后,您可以选择筛选或增强这些记录,然后再将它们发送到可用的目标之一进行处理。在设置管道时,可以选择特定于 Amazon MQ 的设置。将数据发送到目标时,EventBridge Pipes 会保持消息代理中记录的顺序。

Amazon MQ 是一项托管消息代理服务,用于 Apache ActiveMQRabbitMQ。消息代理允许软件应用程序和组件使用各种编程语言、操作系统和正式消息收发协议进行通信,使用主题或队列作为事件目标。

Amazon MQ 还可以通过安装 ActiveMQ 代理或 RabbitMQ 代理,代表您管理 Amazon Elastic Compute Cloud (Amazon EC2) 实例。安装代理后,它会为您的实例提供不同的网络拓扑和其他基础设施需求。

Amazon MQ 源有以下配置限制:

  • 跨账户 - EventBridge 不支持跨账户处理。您不能使用 EventBridge 处理来自另一 Amazon 账户中的 Amazon MQ 消息代理的记录。

  • 身份验证 - 对于 ActiveMQ,仅支持 ActiveMQ SimpleAuthenticationPlugin。对于 RabbitMQ,仅支持 PLAIN 身份验证机制。要管理凭证,请使用 Amazon Secrets Manager。有关 ActiveMQ 身份验证的更多信息,请参阅《Amazon MQ 开发人员指南》中的使用 LDAP 集成 ActiveMQ 代理

  • 连接配额 - 代理具有每个有线级协议允许的最大连接数。此配额基于代理实例类型。有关更多信息,请参阅《Amazon MQ 开发人员指南》中的 *Amazon MQ 中的配额*代理部分。

  • 连接 - 您可以在公有或私有虚拟私有云 (VPC) 中创建代理。对于私有 VPC,您的管道需要具备对 VPC 的访问权限才能接收消息。

  • 事件目标 - 仅支持队列目标。但是,您可以使用虚拟主题,在与管道交互时与,虚拟主题在内部与主题行为一致,在外部与队列行为一致。有关更多信息,请参阅 Apache ActiveMQ 网站上的虚拟目标和 RabbitMQ 网站上的虚拟主机

  • 网络拓扑 - 对于 ActiveMQ,管道仅支持一个单实例或备用代理。对于 RabbitMQ,每个管道只支持一个单实例代理或集群部署。单实例代理需要一个失效转移端点。有关这些代理部署模式的更多信息,请参阅《Amazon MQ 开发人员指南》中的 Active MQ 代理架构RabbitMQ 代理架构

  • 协议 - 支持的协议取决于您使用的 Amazon MQ 集成。

    • 对于 ActiveMQ 集成,EventBridge 使用 OpenWire/Java Message Service (JMS) 协议来使用消息。任何其他协议都不支持消息使用。EventBridge 仅支持 JMS 协议中的 TextMessageBytesMessage 操作。有关 OpenWire 协议的更多信息,请参阅 Apache ActiveMQ 网站上的 OpenWire

    • 对于 RabbitMQ 集成,EventBridge 使用 AMQP 0-9-1 协议来使用消息。消息的使用不支持任何其他协议。有关 RabbitMQ 的 AMQP 0-9-1 协议实施的详细信息,请参阅 RabbitMQ 网站上的 AMQP 0-9-1 完整参考指南

EventBridge 自动支持 Amazon MQ 支持的最新版本 ActiveMQ 和 RabbitMQ。有关受支持的最新版本,请参阅《Amazon MQ 开发人员指南》中的 Amazon MQ 发布说明

注意

默认情况下,Amazon MQ 代理有一个每周维护时段。代理在该时段内无法使用。对于没有备用的代理,EventBridge 在时间窗结束前不会处理消息。

示例事件

以下示例事件显示了管道接收到的信息。您可以使用此事件来创建和筛选您的事件模式,或定义输入转换。并非所有字段都可以筛选。有关可筛选字段的更多信息,请参阅 亚马逊 EventBridge 管道过滤

ActiveMQ

[ { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ]

RabbitMQ

[ { "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "eventSourceKey": "pizzaQueue::/", "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ]

使用者组

为了与 Amazon MQ 进行交互,EventBridge 会创建一个可以从 Amazon MQ 代理中读取的使用者组。使用与管道 UUID 相同的 ID 创建使用者组。

对于 Amazon MQ 源,EventBridge 会将记录合并为批处理,然后通过单个有效负载将其发送到您的函数。要控制行为,您可以配置批处理时段和批处理大小。EventBridge 会提取消息,直到出现下列情况之一时:

  • 处理后的记录达到 6 MB 的有效负载大小上限。

  • 批处理时间窗过期。

  • 记录数达到完整批次大小。

EventBridge 会将您的批次转换为单个有效负载,然后调用您的函数。消息既不会永久保存,也不会反序列化。相反,使用者组会将其作为字节 BLOB 进行检索。然后以 base64 格式编码为 JSON 有效负载。如果管道为批次中的任何消息返回错误,EventBridge 将重试整批消息,直到处理成功或消息过期为止。

网络配置

默认情况下,在 PubliclyAccessible 标志设置为 false 时创建 Amazon MQ 代理。只有在 PubliclyAccessible 设置为 true 时,代理才会接收公有 IP 地址。要获得对管道的完全访问权限,您的代理必须使用公有端点,或提供对 VPC 的访问权限。

如果您的 Amazon MQ 代理不可公开访问,EventBridge 必须能够访问与该代理关联的 Amazon Virtual Private Cloud (Amazon VPC) 资源。要访问您的 Amazon MQ 代理的 VPC,EventBridge 需要您的源子网的出站互联网访问权限。对于公有子网,它必须是托管 NAT 网关。对于私有子网,它可以是 NAT 网关,也可以是您自己的 NAT。确保此 NAT 具有公共 IP 地址,可以连接到互联网。

使用以下规则配置您的 Amazon VPC 安全组(至少):

  • 入站规则 – 对于没有公共可访问性的代理,允许指定为源的安全组的所有端口上的所有流量。对于具有公共可访问性的代理,允许所有目标的所有端口上的所有流量。

  • 出站规则 – 允许所有目标的所有端口上的所有流量传输。

注意

可通过 Amazon MQ API 发现您的 Amazon VPC 配置。在设置过程中,您不需要对其进行配置。