本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
入门:创建并连接 RabbitMQ 代理
代理 是运行在 Amazon MQ 上的消息代理环境。它是 Amazon MQ 的基本构建块。代理实例类(m5
、t3
)和大小(large
、micro
)的综合描述是一个代理实例类型(例如 mq.m5.large
)。有关更多信息,请参阅 适用于 RabbitMQ 经纪商的亚马逊 MQ 是什么?
第 1 步:创建 RabbitMQ 代理
第一个也是最常见的 Amazon MQ 任务是创建代理。以下示例说明如何使用创建基本经纪商。 Amazon Web Services Management Console
登录 Amazon MQ 控制台
。 -
在 Select broker engine (选择代理引擎) 页面上,选择 RabbitMQ,然后选择 Next (下一步)。
-
在 Select deployment mode (选择部署模式) 页面上,选择 Deployment mode (部署模式),例如 Cluster deployment (集群部署),然后选择 Next (下一步)。
-
单实例代理由 Network Load Balancer (NLB) 后面一个可用区域中的一个代理组成。代理与您的应用程序和 Amazon EBS 存储卷进行通信。有关更多信息,请参阅 选项 1:适用于 RabbitMQ 的亚马逊 MQ 单实例代理。
-
高可用性的 RabbitMQ 集群部署是由 Network Load Balancer 后面的三个 RabbitMQ 代理节点组成的逻辑分组,每个节点在多个可用区(AZ)之间共享用户、队列和分布式状态。有关更多信息,请参阅 选项 2:适用于 RabbitMQ 集群部署的亚马逊 MQ。
-
-
在 Configure settings (配置设置) 页面的 Details (详细信息) 部分,执行以下操作:
-
输入 Broker name (代理名称)。
重要
请勿在经纪人名称中添加个人身份信息 (PII) 或其他机密或敏感信息。代理名称可供其他 Amazon 服务(包括日 CloudWatch 志)访问。代理名称不适合用于私有或敏感数据。
选择 Broker instance type (代理实例类型)(例如 mq.m5.large)。有关更多信息,请参阅 Broker instance types。
注意
其他设置部分提供了为代理启用 CloudWatch 日志和配置网络访问权限的选项。如果您创建不允许公开访问的私有 RabbitMQ 代理,则必须选择虚拟私有云 (VPC) 并配置安全组才能访问您的代理。
-
-
在 Configure settings (配置设置) 页面的 RabbitMQ access (RabbitMQ 访问) 部分,提供 Username (用户名) 和 Password (密码)。以下限制适用于代理程序登录凭证:
用户名只能包含字母数字字符、短划线、句点和下划线(- . _)。此值不得包含任何波浪线(~)字符。Amazon MQ 禁止使用
guest
作为用户名。-
密码必须至少为 12 个字符,包含至少 4 个唯一字符,并且不得包含逗号、冒号或等号(,:=)。
重要
请勿在经纪人用户名中添加个人身份信息 (PII) 或其他机密或敏感信息。其他 Amazon 服务(包括 CloudWatch 日志)可以访问经纪人的用户名。代理用户名不适合用于私有或敏感数据。
-
选择下一步。
-
在 Review and create (审核和创建) 页面上,您可以查看您的选择并根据需要对其进行编辑。
-
选择 Create broker (创建代理)。
当 Amazon MQ 创建您的代理时,会显示 Creation in progress (正在创建) 状态。
创建代理大约需要 15 分钟。
成功创建您的代理后,Amazon MQ 会显示 Running (正在运行) 状态。
-
选择
MyBroker
.在存储库的
MyBroker
页面,在 Connect 部分,记下您的经纪商的 RabbitMQ 网页控制台URL,例如: https://b-c8349341-ec91-4a78-ad9c-a57f23f235bb.mq.us-west-2.amazonaws.com
另外,请注意您的经纪人的安全AMQP端点
。以下是一个 amqps
终端节点显示侦听器端口5671
的示例。amqps://b-c8349341-ec91-4a78-ad9c-a57f23f235bb.mq.us-west-2.amazonaws.com:5671
第 2 步:将JVM基于应用程序的应用程序连接到您的代理
创建 RabbitMQ 代理后,您可以将应用程序连接到该代理。以下示例演示如何使用 RabbitMQ Java 客户端库
先决条件
注意
以下先决条件步骤仅适用于创建的没有公开可访问性的 RabbitMQ 代理。如果您正在创建具有公开可访问性的代理,则可以跳过它们。
启用VPC属性
为确保您的经纪人可在您的内部访问VPC,您必须启用enableDnsHostnames
和enableDnsSupport
VPC属性。有关更多信息,请参阅DNS《亚马逊VPC用户指南》VPC中的 Support。
启用入站连接
登录 Amazon MQ 控制台
。 从经纪人列表中,选择您的经纪商的名称(例如 MyBroker)。
-
在存储库的
MyBroker
页面上的 “连接” 部分,记下代理的 Web 控制台和线级协议的地址URL和端口。 -
在 Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或 。
屏幕上将显示EC2控制面板的 “安全组” 页面。
-
从安全组列表中,选择您的安全组。
-
在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)。
-
在编辑入站规则对话框中,为您想要公开访问的每个URL或终端节点添加一条规则(以下示例说明了如何为代理 Web 控制台执行此操作)。
-
选择添加规则。
-
在 “类型” 中,选择 “自定义” TCP。
-
对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如
192.0.2.1
)。 -
选择保存。
您的代理现在可以接受入站连接。
-
添加 Java 依赖项
如果您使用 Apache Maven 进行自动构建,请将以下依赖项添加到您的 pom.xml
文件中。有关 Apache Maven 中的项目对象模型文件的更多信息,请参阅简介。POM
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
如果您正在使用 Gradle
dependencies { compile 'com.rabbitmq:amqp-client:5.9.0' }
导入 Connection
和 Channel
类
RabbitMQ Java 客户端使用com.rabbitmq.client
作为其顶级软件包,Connection
和Channel
API类分别表示 AMQP 0-9-1 连接和通道。使用前导入 Connection
和 Channel
类,如以下示例所示。
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
创建 ConnectionFactory
并连接到您的代理
使用以下示例创建具有给定参数的 ConnectionFactory
类实例。使用 setHost
方法配置您之前记下的代理终端节点。对于 AMQPS
线级连接,请使用端口 5671
。
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); //Replace the URL with your information factory.setHost("
b-c8352341-ec91-4a78-ad9c-a43f23d325bb.mq.us-west-2.amazonaws.com
"); factory.setPort(5671); // Allows client to establish a connection over TLS factory.useSslProtocol(); // Create a connection Connection conn = factory.newConnection(); // Create a channel Channel channel = conn.createChannel();
向交换器发布消息
您可以使用 Channel.basicPublish
将消息发布到交换器。以下示例使用该AMQPBuilder
类生成内容plain/text
类型的消息属性对象。
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder() .contentType("text/plain") .userId("userId") .build(), messageBodyBytes);
注意
请注意,BasicProperties
是自动生成的持有者类的内部类 AMQP
。
订阅队列并接收消息
您可以通过使用 Consumer
接口订阅队列来接收消息。订阅后,消息将在到达时自动传递。
实现 Consumer
的最简单方法是使用子类 DefaultConsumer
。DefaultConsumer
对象可以作为 basicConsume
调用的一部分传递以设置订阅,如以下示例所示。
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
注意
因为我们指定了 autoAck = false
,所以必须确认传递到 Consumer
的消息,这在 handleDelivery
方法中完成最为方便,如示例所示。
关闭连接并断开与代理的连接
要断开与您的 RabbitMQ 代理的连接,请关闭通道和连接,如下所示。
channel.close(); conn.close();
注意
有关使用 RabbitMQ Java 客户端库的更多信息,请参阅 RabbitMQ Java 客户端指南
步骤 3:(可选)Connect 到 Amazon Lambda 函数
Amazon Lambda 可以连接并使用来自您的 Amazon MQ 代理的消息。当您将代理连接到 Lambda 时,可以创建事件源映射,从队列中读取消息并同步调用函数。您创建的事件源映射会从您的代理中批量读取消息,并将其转换为对象形式的 Lambda 有效负载。JSON
将您的代理连接到 Lambda 函数
-
将以下IAM角色权限添加到您的 Lambda 函数执行角色。
注意
如果没有必要的IAM权限,您的函数将无法成功读取 Amazon MQ 资源中的记录。
-
(可选)如果您创建了一个没有公开可访问性的代理,则必须执行下面其中一项操作以允许 Lambda 连接到您的代理:
-
为每个公有子NAT网配置一个网关。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的VPC已连接功能的 Internet 和服务访问权限。
-
使用终端节点在您的亚马逊虚拟私有云(亚马逊VPC)和 Lambda 之间创建连接。VPC您的亚马逊还VPC必须连接到 Amazon Security Token Service (Amazon STS) 和 Secrets Manager 终端节点。有关更多信息,请参阅Amazon Lambda 开发人员指南中的为 Lambda 配置接口VPC终端节点。
-
-
使用 Amazon Web Services Management Console为 Lambda 函数配置代理作为事件源。您也可以使用该
create-event-source-mapping
Amazon Command Line Interface 命令。 -
为 Lambda 函数编写一些代码来处理从您的代理使用的消息。事件源映射检索的 Lambda 负载取决于代理的引擎类型。以下是 Amazon MQ for RabbitMQ 队列的 Lambda 负载示例。
注意
在该示例中,
test
是队列的名称,/
是默认虚拟主机的名称。接收消息时,事件源会将消息列在test::/
下。{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "test::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 } "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
有关将 Amazon MQ 连接到 Lambda、Lambda 为 Amazon MQ 事件源提供支持的选项和事件源映射错误的更多信息,请参阅《Amazon Lambda 开发人员指南》中的将 Lambda 与 Amazon MQ 结合使用。