本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Java 应用程序连接到您的 Amazon MQ 代理
创建 Amazon MQ ActiveMQ 代理后,您可以将应用程序连接到该代理。以下示例说明如何使用 Java 消息服务 (JMS) 来创建与代理的连接、创建队列和发送消息。有关完整的可用 Java 示例,请参阅Working Java Example。
您可以使用各种 ActiveMQ 客户端
先决条件
启用VPC属性
为确保您的经纪人可在您的内部访问VPC,您必须启用enableDnsHostnames
和enableDnsSupport
VPC属性。有关更多信息,请参阅DNS《亚马逊VPC用户指南》VPC中的 Support。
启用入站连接
接下来,为您的应用程序启用入站连接。
登录 Amazon MQ 控制台
。 从经纪人列表中,选择您的经纪商的名称(例如 MyBroker)。
-
在存储库的
MyBroker
页面上的 “连接” 部分,记下代理的 Web 控制台和线级协议的地址URL和端口。 -
在 Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或 。
屏幕上将显示EC2控制面板的 “安全组” 页面。
-
从安全组列表中,选择您的安全组。
-
在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)。
-
在编辑入站规则对话框中,为您想要公开访问的每个URL或终端节点添加一条规则(以下示例说明了如何为代理 Web 控制台执行此操作)。
-
选择添加规则。
-
在 “类型” 中,选择 “自定义” TCP。
-
对于 Port Range (端口范围),键入 Web 控制台端口(
8162
)。 -
对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如
192.0.2.1
)。 -
选择保存。
您的代理现在可以接受入站连接。
-
添加 Java 依赖项
将 activemq-client.jar
和 activemq-pool.jar
程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的 pom.xml
文件中的这些依赖关系。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
有关 activemq-client.jar
的更多信息,请参阅 Apache ActiveMQ 文档中的初始配置
重要
在以下示例代码中,生产者和使用者在单个线程中运行。对于生产系统(或测试代理实例故障转移),请确保您的创建者和使用者在单独的主机或线程上运行。
创建消息创建者并发送消息
使用以下说明创建消息创建者并接收消息。
-
使用代理的端点为消息生成器创建JMS池化连接工厂,然后针对该工厂调用该
createConnection
方法。注意
对于主用/备用代理,Amazon MQ 提供了两个 ActiveMQ Web URLs 控制台,但一次只能有一个处于活动状态。URL同样,Amazon MQ 为每个线级协议提供两个终端节点,但每次每对中只有一个终端节点处于活动状态。
-1
和-2
后缀表示冗余对。有关更多信息,请参阅 适用于 ActiveMQ 经纪商的 Amazon MQ 部署选项。对于线级协议终端节点,您可以允许应用程序使用故障转移传输
连接到任一终端节点。 // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start(); // Close all connections in the pool. pooledConnectionFactory.clear();
注意
消息创建者应始终使用
PooledConnectionFactory
类。有关更多信息,请参阅 始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息创建者。// Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
创建消息字符串
"Hello from Amazon MQ!"
,然后发送消息。// Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
-
清理创建者。
producer.close(); producerSession.close(); producerConnection.close();
创建消息使用者并接收消息
使用以下说明创建消息创建者并接收消息。
-
使用代理的端点为消息生成器创建JMS连接工厂,然后针对该工厂调用该
createConnection
方法。// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
注意
消息使用者绝不 应使用
PooledConnectionFactory
类。有关更多信息,请参阅 始终使用连接池。 -
创建一个会话,一个名为
MyQueue
的队列和消息使用者。// Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
-
开始等待消息,并在消息到达时收到消息。
// Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
注意
与 Amazon 消息服务(例如AmazonSQS)不同,消费者经常与经纪人建立联系。
-
关闭使用者、会话和连接。
consumer.close(); consumerSession.close(); consumerConnection.close();