创建并连接到 ActiveMQ 代理
代理 是运行在 Amazon MQ 上的消息代理环境。它是 Amazon MQ 的基本构建块。代理实例类(m5
、t3
)和大小(large
、micro
)的综合描述是一个代理实例类型(例如 mq.m5.large
)。有关更多信息,请参阅代理。
步骤 1:创建 ActiveMQ 代理
第一个也是最常见的 Amazon MQ 任务是创建代理。以下示例演示如何使用Amazon Web Services Management Console创建基本代理。
登录 Amazon MQ 控制台
。 -
在 Select broker engine (选择代理引擎) 页面上,选择 Apache ActiveMQ。
-
在 Select deployment and storage (选择部署和存储) 页面的 Deployment mode and storage type (部署模式和存储类型) 部分,执行以下操作:
-
选择 Deployment mode (部署模式)(例如 Active/standby broker (主动/备用代理))。有关更多信息,请参阅Broker Architecture。
-
单实例代理由一个可用区中的一个代理组成。代理与您的应用程序以及 Amazon EBS 或 Amazon EFS 存储卷进行通信。有关更多信息,请参阅Amazon MQ 单实例代理。
-
高可用性的主动/备用代理由两个不同可用区中的两个代理组成,以冗余对配置。这些代理与您的应用程序以及 Amazon EFS 进行同步通信。有关更多信息,请参阅用于实现高可用性的 Amazon MQ 主动/备用代理。
-
有关代理网络示例蓝图的更多信息,请参阅示例蓝图。
-
-
选择 Storage type (存储类型)(例如 EBS)。有关更多信息,请参阅Storage。
注意
Amazon EBS 在单个可用区内复制数据,但不支持 ActiveMQ 主动/备用部署模式。
选择 Next(下一步)。
-
-
在 Configure settings (配置设置) 页面的 Details (详细信息) 部分,执行以下操作:
-
输入 Broker name (代理名称)。
重要
请勿在代理名称中添加个人身份信息(PII)或其他机密或敏感信息。其他Amazon服务(包括 CloudWatch Logs)可以访问代理名称。代理名称不适合用于私有或敏感数据。
选择 Broker instance type (代理实例类型)(例如 mq.m5.large)。有关更多信息,请参阅Broker instance types。
-
-
在 ActiveMQ Web Console access (ActiveMQ Web 控制台访问) 部分,提供 Username (用户名) 和 Password (密码)。以下限制适用于代理用户名和密码:
-
用户名只能包含字母数字字符、短划线、句点、下划线和波浪线(- . _ ~)。
-
密码必须至少为 12 个字符,包含至少 4 个唯一字符,并且不得包含逗号、冒号或等号(,:=)。
重要
请勿在代理用户名中添加个人身份信息(PII)或其他机密或敏感信息。其他Amazon服务(包括 CloudWatch Logs)可以访问代理用户名。代理用户名不适合用于私有或敏感数据。
-
-
选择 Deploy(部署)。
当 Amazon MQ 创建您的代理时,会显示 Creation in progress (正在创建) 状态。
创建代理大约需要 15 分钟。
成功创建您的代理后,Amazon MQ 会显示 Running (正在运行) 状态。
-
选择
MyBroker
。在
MyBroker
页面的 Connect(连接)部分,请注意代理的 ActiveMQ web console(ActiveMQ Web 控制台)URL,例如: https://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8162
另外,请记下您代理的线级协议终端节点
。以下是 OpenWire 终端节点的示例: ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
步骤 2:将 Java 应用程序连接到您的代理
创建 Amazon MQ ActiveMQ 代理后,您可以将应用程序连接到该代理。以下示例演示如何使用 Java Message Service(JMS)创建代理连接、创建队列以及发送消息。有关完整的可用 Java 示例,请参阅Working Java Example。
您可以使用各种 ActiveMQ 客户端
先决条件
启用 VPC 属性
注意
您无法禁用现有 Amazon MQ 代理的公共访问权限。
要确保您的代理可以在您的 VPC 中访问,您必须启用 enableDnsHostnames
和 enableDnsSupport
VPC 属性。有关更多信息,请参阅《Amazon VPC 用户指南》中的 VPC 中的 DNS Support。
启用入站连接
登录 Amazon MQ 控制台
。 从代理列表中选择您的代理的名称(例如 MyBroker)。
-
在
MyBroker
页面的 Connections (连接) 部分,记下代理 Web 控制台 URL 和线级协议的地址和端口。 -
在 Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或
。
此时将显示 EC2 Dashboard 的 Security Groups (安全组) 页面。
-
从安全组列表中,选择您的安全组。
-
在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)。
-
在 Edit inbound rules (编辑入站规则) 对话框中,为希望公开访问的每个 URL 或终端节点添加规则(以下示例显示如何为代理 Web 控制台执行此操作。
-
选择 Add Rule(添加规则)。
-
对于 Type (类型),选择 Custom TCP (自定义 TCP)。
-
对于 Port Range (端口范围),键入 Web 控制台端口(
8162
)。 -
对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如
192.0.2.1
)。 -
选择 Save(保存)。
您的代理现在可以接受入站连接。
-
添加 Java 依赖项
将 activemq-client.jar
和 activemq-pool.jar
程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的 pom.xml
文件中的这些依赖关系。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.8</version> </dependency> </dependencies>
有关 activemq-client.jar
的更多信息,请参阅 Apache ActiveMQ 文档中的初始配置
重要
在以下示例代码中,生产者和使用者在单个线程中运行。对于生产系统(或测试代理实例故障转移),请确保您的创建者和使用者在单独的主机或线程上运行。
创建消息创建者并发送消息
-
使用代理的终端节点为消息创建者创建 JMS 池连接工厂,然后针对该工厂调用
createConnection
方法。注意
对于主动/备用代理,Amazon MQ 提供两个 ActiveMQ Web 控制台 URL,但每次只有一个 URL 处于活动状态。同样,Amazon MQ 为每个线级协议提供两个终端节点,但每次每对中只有一个终端节点处于活动状态。
-1
和-2
后缀表示冗余对。有关更多信息,请参阅 Broker Architecture。对于线级协议终端节点,您可以允许应用程序使用故障转移传输
连接到任一终端节点。 // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // Close all connections in the pool. pooledConnectionFactory.clear();
注意
消息创建者应始终使用
PooledConnectionFactory
类。有关更多信息,请参阅始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息创建者。// Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
创建消息字符串
"Hello from Amazon MQ!"
,然后发送消息。// Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
-
清理创建者。
producer.close(); producerSession.close(); producerConnection.close();
创建消息使用者并接收消息
-
使用代理的终端节点为消息创建者创建 JMS 连接工厂,然后针对该工厂调用
createConnection
方法。// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
注意
消息使用者绝不 应使用
PooledConnectionFactory
类。有关更多信息,请参阅始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息使用者。// Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
-
开始等待消息,并在消息到达时收到消息。
// Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
注意
与Amazon消息收发服务(例如 Amazon SQS)不同的是,使用者会不断连接到代理。
-
关闭使用者、会话和连接。
consumer.close(); consumerSession.close(); consumerConnection.close();
步骤 3:(可选)连接到 Amazon Lambda 函数
Amazon Lambda 可以连接并使用来自您的 Amazon MQ 代理的消息。当您将代理连接到 Lambda 时,可以创建事件源映射,从队列中读取消息并同步调用函数。您创建的事件源映射分批从您的代理中读取消息,并以 JSON 对象的形式将它们转换为 Lambda 负载。
将您的代理连接到 Lambda 函数
-
将以下 IAM 角色权限添加到 Lambda 函数执行角色。
注意
如果没有必要的 IAM 权限,您的函数将无法从 Amazon MQ 资源中成功读取记录。
-
(可选)如果您创建了一个没有公开可访问性的代理,则必须执行下面其中一项操作以允许 Lambda 连接到您的代理:
-
为每个公有子网配置一个 NAT 网关。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的VPC 连接函数的互联网和服务访问。
-
使用 VPC 终端节点在您的 Amazon Virtual Private Cloud(Amazon VPC)和 Lambda 之间创建连接。您的 Amazon VPC 还必须连接到 Amazon Security Token Service (Amazon STS) 和 Secrets Manager 端点。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的为 Lambda 配置接口 VPC 终端节点。
-
-
使用Amazon Web Services Management Console为 Lambda 函数配置代理作为事件源。您也可以使用
create-event-source-mapping
Amazon Command Line Interface 命令。 -
为 Lambda 函数编写一些代码来处理从您的代理使用的消息。事件源映射检索的 Lambda 负载取决于代理的引擎类型。以下是适用于 Amazon MQ for ActiveMQ 队列的 Lambda 负载示例。
注意
在该示例中,
testQueue
是队列的名称。{ "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": { [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType":"jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ] } }
有关将 Amazon MQ 连接到 Lambda、Lambda 为 Amazon MQ 事件源提供支持的选项和事件源映射错误的更多信息,请参阅《Amazon Lambda 开发人员指南》中的将 Lambda 与 Amazon MQ 结合使用。
步骤 4:删除代理
如果您不使用 Amazon MQ 代理(并且估计未来近期也不会使用代理),最佳实践是将其从 Amazon MQ 中删除以减少Amazon成本。
以下示例演示如何使用Amazon Web Services Management Console删除代理。
登录 Amazon MQ 控制台
。 从代理列表中,选择您的代理(例如 MyBroker),然后选择 Delete (删除)。
在 Delete
MyBroker
? (是否删除 MyBroker?) 对话框中,键入delete
,然后选择 Delete (删除)。删除代理大约需要 5 分钟。
后续步骤
现在,您已经创建了一个代理,将一个应用程序连接到了该代理,并发送和接收了一条消息,您可以希望尝试以下操作:
您也可以开始深入了解 Amazon MQ 的最佳实践和 Amazon MQ REST API,然后计划迁移到 Amazon MQ。