Connecting your JMS application
This tutorial shows you how to connect your JMS application to Amazon MQ for RabbitMQ broker using the RabbitMQ JMS client. You will learn how to create a producer to send messages and a consumer to receive messages from RabbitMQ queues.
Before you begin, add the appropriate RabbitMQ JMS dependency to your Maven project:
For JMS 1.1 and 2.0:
<dependencies> <dependency> <groupId>com.rabbitmq.jms</groupId> <artifactId>rabbitmq-jms</artifactId> <version>2.12.0</version> </dependency> </dependencies>
For JMS 3.1:
<dependencies> <dependency> <groupId>com.rabbitmq.jms</groupId> <artifactId>rabbitmq-jms</artifactId> <version>3.5.0</version> </dependency> </dependencies>
Create a producer
The following code example shows how to write to a RabbitMQ queue using JMS:
import jakarta.jms.*; import com.rabbitmq.jms.admin.*; // Setting the connection factory RMQConnectionFactory factory = new RMQConnectionFactory(); factory.setHost(envProps.getProperty("RABBITMQ_HOST", "localhost")); factory.setPort(Integer.parseInt(envProps.getProperty("RABBITMQ_PORT", "5672"))); factory.setUsername(envProps.getProperty("RABBITMQ_USERNAME", "guest")); factory.setPassword(envProps.getProperty("RABBITMQ_PASSWORD", "guest")); factory.setVirtualHost(envProps.getProperty("RABBITMQ_VIRTUAL_HOST", "/")); factory.useSslProtocol(); connection = factory.createConnection(); connection.start(); String queueName = "test-queue-jms"; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); RMQDestination destination = new RMQDestination(queueName, true, false); // Send the message to the queue MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); String msg_content = "Hello World!!"; TextMessage textMessage = session.createTextMessage(msg_content); producer.send(textMessage); System.out.printf("Published to AMQP queue '%s': %s", queueName, msg_content);
Create a consumer
The following code example shows how to read from a RabbitMQ queue using JMS:
import jakarta.jms.*; import com.rabbitmq.jms.admin.*; // Setting the connection factory RMQConnectionFactory factory = new RMQConnectionFactory(); factory.setHost(envProps.getProperty("RABBITMQ_HOST", "localhost")); factory.setPort(Integer.parseInt(envProps.getProperty("RABBITMQ_PORT", "5672"))); factory.setUsername(envProps.getProperty("RABBITMQ_USERNAME", "guest")); factory.setPassword(envProps.getProperty("RABBITMQ_PASSWORD", "guest")); factory.setVirtualHost(envProps.getProperty("RABBITMQ_VIRTUAL_HOST", "/")); factory.useSslProtocol(); // Establish the connection and session jakarta.jms.Connection connection = factory.createConnection(); String queueName = "test-queue-jms"; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); RMQDestination destination = new RMQDestination(); destination.setDestinationName(queueName); destination.setAmqp(true); destination.setAmqpQueueName(queueName); // Initialize consumer MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.printf("Message: %s%n", textMessage.getText()); } else if (message instanceof BytesMessage) { BytesMessage bytesMessage = (BytesMessage) message; byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; bytesMessage.readBytes(bytes); String content = new String(bytes); System.out.printf("Message: %s%n", content); } else { System.out.printf("Message: [%s]%n", message.getClass().getSimpleName()); } } catch (JMSException e) { System.err.printf("Error processing message: %s%n", e.getMessage()); } }); connection.start();