本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 ActiveMQ 中使用 Java 消息服务 (JMS) 的工作示例
以下示例演示如何以编程方式使用 ActiveMQ:
-
示 OpenWire 例 Java 代码连接到代理、创建队列以及发送和接收消息。有关详细分解和说明,请参阅Connecting a Java application to your broker。
-
示MQTT例 Java 代码连接到代理、创建主题以及发布和接收消息。
-
STOMP+ WSS 示例 Java 代码连接到代理、创建队列以及发布和接收消息。
先决条件
启用VPC属性
为确保您的经纪人可在您的内部访问VPC,您必须启用enableDnsHostnames
和enableDnsSupport
VPC属性。有关更多信息,请参阅DNS《亚马逊VPC用户指南》VPC中的 Support。
启用入站连接
要以编程方式使用 Amazon MQ,您必须使用入站连接。
登录 Amazon MQ 控制台
。 从经纪人列表中,选择您的经纪商的名称(例如 MyBroker)。
-
在存储库的
MyBroker
页面上的 “连接” 部分,记下代理的 Web 控制台和线级协议的地址URL和端口。 -
在 Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或 。
屏幕上将显示EC2控制面板的 “安全组” 页面。
-
从安全组列表中,选择您的安全组。
-
在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)。
-
在编辑入站规则对话框中,为您想要公开访问的每个URL或终端节点添加一条规则(以下示例说明了如何为代理 Web 控制台执行此操作)。
-
选择添加规则。
-
在 “类型” 中,选择 “自定义” TCP。
-
对于 Port Range (端口范围),键入 Web 控制台端口(
8162
)。 -
对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如
192.0.2.1
)。 -
选择保存。
您的代理现在可以接受入站连接。
-
添加 Java 依赖项
- OpenWire
-
将
activemq-client.jar
和activemq-pool.jar
程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的pom.xml
文件中的这些依赖关系。<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.16</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.16</version> </dependency> </dependencies>
有关
activemq-client.jar
的更多信息,请参阅 Apache ActiveMQ 文档中的初始配置。 - MQTT
-
将
org.eclipse.paho.client.mqttv3.jar
程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的pom.xml
文件中的这一依赖关系。<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>
有关
org.eclipse.paho.client.mqttv3.jar
的更多信息,请参阅 Eclipse Paho Java 客户端。 - STOMP+WSS
-
将以下程序包添加到了 Java 类路径:
-
spring-messaging.jar
-
spring-websocket.jar
-
javax.websocket-api.jar
-
jetty-all.jar
-
slf4j-simple.jar
-
jackson-databind.jar
以下示例说明了 Maven 项目的
pom.xml
文件中的这些依赖关系。<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>5.0.5.RELEASE</version> </dependency> <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <type>pom</type> <version>9.3.3.v20150827</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.6.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.0</version> </dependency> </dependencies>
有关更多信息,请参阅 Spring Fr STOMPam
ework 文档中的 Support。 -
一个 mazonMQExample .java
重要
在以下示例代码中,生产者和使用者在单个线程中运行。对于生产系统(或测试代理实例故障转移),请确保您的创建者和使用者在单独的主机或线程上运行。
- OpenWire
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.jms.pool.PooledConnectionFactory; import javax.jms.*; public class AmazonMQExample { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws JMSException { final ActiveMQConnectionFactory connectionFactory = createActiveMQConnectionFactory(); final PooledConnectionFactory pooledConnectionFactory = createPooledConnectionFactory(connectionFactory); sendMessage(pooledConnectionFactory); receiveMessage(connectionFactory); pooledConnectionFactory.stop(); } private static void sendMessage(PooledConnectionFactory pooledConnectionFactory) throws JMSException { // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory .createConnection(); producerConnection.start(); // Create a session. final Session producerSession = producerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession .createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession .createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // Create a message. final String text = "Hello from Amazon MQ!"; final TextMessage producerMessage = producerSession .createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent."); // Clean up the producer. producer.close(); producerSession.close(); producerConnection.close(); } private static void receiveMessage(ActiveMQConnectionFactory connectionFactory) throws JMSException { // Establish a connection for the consumer. // Note: Consumers should not use PooledConnectionFactory. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start(); // Create a session. final Session consumerSession = consumerConnection .createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession .createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession .createConsumer(consumerDestination); // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText()); // Clean up the consumer. consumer.close(); consumerSession.close(); consumerConnection.close(); } private static PooledConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); return pooledConnectionFactory; } private static ActiveMQConnectionFactory createActiveMQConnectionFactory() { // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT); // Pass the sign-in credentials. connectionFactory.setUserName(ACTIVE_MQ_USERNAME); connectionFactory.setPassword(ACTIVE_MQ_PASSWORD); return connectionFactory; } } - MQTT
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.eclipse.paho.client.mqttv3.*; public class AmazonMQExampleMqtt implements MqttCallback { // Specify the connection parameters. private final static String WIRE_LEVEL_ENDPOINT = "
ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8883
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { new AmazonMQExampleMqtt().run(); } private void run() throws MqttException, InterruptedException { // Specify the topic name and the message text. final String topic = "myTopic"; final String text = "Hello from Amazon MQ!"; // Create the MQTT client and specify the connection options. final String clientId = "abc123"; final MqttClient client = new MqttClient(WIRE_LEVEL_ENDPOINT, clientId); final MqttConnectOptions connOpts = new MqttConnectOptions(); // Pass the sign-in credentials. connOpts.setUserName(ACTIVE_MQ_USERNAME); connOpts.setPassword(ACTIVE_MQ_PASSWORD.toCharArray()); // Create a session and subscribe to a topic filter. client.connect(connOpts); client.setCallback(this); client.subscribe("+"); // Create a message. final MqttMessage message = new MqttMessage(text.getBytes()); // Publish the message to a topic. client.publish(topic, message); System.out.println("Published message."); // Wait for the message to be received. Thread.sleep(3000L); // Clean up the connection. client.disconnect(); } @Override public void connectionLost(Throwable cause) { System.out.println("Lost connection."); } @Override public void messageArrived(String topic, MqttMessage message) throws MqttException { System.out.println("Received message from topic " + topic + ": " + message); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivered message."); } } - STOMP+WSS
-
/* * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.simp.stomp.*; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient; import java.lang.reflect.Type; public class AmazonMQExampleStompWss { // Specify the connection parameters. private final static String DESTINATION = "/queue"; private final static String WIRE_LEVEL_ENDPOINT = "
wss://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61619
"; private final static String ACTIVE_MQ_USERNAME = "MyUsername123
"; private final static String ACTIVE_MQ_PASSWORD = "MyPassword456
"; public static void main(String[] args) throws Exception { final AmazonMQExampleStompWss example = new AmazonMQExampleStompWss(); final StompSession stompSession = example.connect(); System.out.println("Subscribed to a destination using session."); example.subscribeToDestination(stompSession); System.out.println("Sent message to session."); example.sendMessage(stompSession); Thread.sleep(60000); } private StompSession connect() throws Exception { // Create a client. final WebSocketClient client = new StandardWebSocketClient(); final WebSocketStompClient stompClient = new WebSocketStompClient(client); stompClient.setMessageConverter(new StringMessageConverter()); final WebSocketHttpHeaders headers = new WebSocketHttpHeaders(); // Create headers with authentication parameters. final StompHeaders head = new StompHeaders(); head.add(StompHeaders.LOGIN, ACTIVE_MQ_USERNAME); head.add(StompHeaders.PASSCODE, ACTIVE_MQ_PASSWORD); final StompSessionHandler sessionHandler = new MySessionHandler(); // Create a connection. return stompClient.connect(WIRE_LEVEL_ENDPOINT, headers, head, sessionHandler).get(); } private void subscribeToDestination(final StompSession stompSession) { stompSession.subscribe(DESTINATION, new MyFrameHandler()); } private void sendMessage(final StompSession stompSession) { stompSession.send(DESTINATION, "Hello from Amazon MQ!".getBytes()); } private static class MySessionHandler extends StompSessionHandlerAdapter { public void afterConnected(final StompSession stompSession, final StompHeaders stompHeaders) { System.out.println("Connected to broker."); } } private static class MyFrameHandler implements StompFrameHandler { public Type getPayloadType(final StompHeaders headers) { return String.class; } public void handleFrame(final StompHeaders stompHeaders, final Object message) { System.out.print("Received message from topic: " + message); } } }