本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon SQS Java Messaging Library
要开始结合使用 Java Message Service (JMS) 与 Amazon SQS,请使用本节中的代码示例。以下部分介绍如何创建 JMS 连接和会话以及如何发送和接收消息。
Amazon SQS Java Messaging Library 中包含了封装的 Amazon SQS 客户端对象,该对象会检查是否存在 Amazon SQS 队列。如果队列不存在,客户端将创建它。
创建 JMS 连接
在开始之前,请先查看使用 JMS 和 Amazon SQS 的先决条件中的先决条件。
-
创建连接工厂并对该工厂调用
createConnection
方法。// Create a new connection factory with all defaults (credentials and region) set automatically SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.defaultClient() ); // Create the connection. SQSConnection connection = connectionFactory.createConnection();
SQSConnection
类对javax.jms.Connection
进行了扩展。除了 JMS 标准连接方法,SQSConnection
还提供其他一些方法,例如getAmazonSQSClient
和getWrappedAmazonSQSClient
。这两种方法均可让您执行未包含在 JMS 规范中的管理操作,例如创建新队列。不过,getWrappedAmazonSQSClient
方法还提供当前连接使用的 Amazon SQS 客户端的封装版本。该包装程序将客户端中的每个异常转变为JMSException
,从而让预期有JMSException
发生的现有代码更方便地使用该异常。 -
您可以使用从
getAmazonSQSClient
和getWrappedAmazonSQSClient
返回的客户端对象来执行未包含在 JMS 规范中的管理操作(例如,可以创建 Amazon SQS 队列)。如果您现有的代码需要 JMS 异常,则应使用
getWrappedAmazonSQSClient
:-
如果您使用
getWrappedAmazonSQSClient
,则返回的客户端对象会将所有异常转变成 JMS 异常。 -
如果您使用
getAmazonSQSClient
,则这些异常将全部为 Amazon SQS 异常。
-
创建 Amazon SQS 队列
包装的客户端对象将检查是否存在 Amazon SQS 队列。
如果队列不存在,客户端将创建它。如果队列存在,则该功能不会返回任何值。有关更多信息,请参阅 TextMessageSender.java 示例中的“根据需要创建队列”一节。
创建标准队列
// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an SQS queue named MyQueue, if it doesn't already exist if (!client.queueExists("MyQueue")) { client.createQueue("MyQueue"); }
创建 FIFO 队列
// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an Amazon SQS FIFO queue named MyQueue.fifo, if it doesn't already exist if (!client.queueExists("MyQueue.fifo")) { Map<String, String> attributes = new HashMap<String, String>(); attributes.put("FifoQueue", "true"); attributes.put("ContentBasedDeduplication", "true"); client.createQueue(new CreateQueueRequest().withQueueName("MyQueue.fifo").withAttributes(attributes)); }
注意
FIFO 队列名称必须以 .fifo
后缀结尾。
有关 ContentBasedDeduplication
属性的更多信息,请参阅在 Amazon 中精确处理一次 SQS。
同步发送消息
-
当连接和基础 Amazon SQS 队列准备就绪时,将创建一个具有
AUTO_ACKNOWLEDGE
模式的非事务性 JMS 会话。// Create the nontransacted session with AUTO_ACKNOWLEDGE mode Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
为了向队列发送文本消息,将创建一个 JMS 队列标识和消息创建者。
// Create a queue identity and specify the queue name to the session Queue queue = session.createQueue("MyQueue"); // Create a producer for the 'MyQueue' MessageProducer producer = session.createProducer(queue);
-
创建文本消息并将它发送到队列。
-
要向标准队列发送消息,您无需设置任何其他参数。
// Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID());
-
要向 FIFO 队列发送消息,必须设置消息组 ID。您也可以设置消息重复数据删除 ID。有关更多信息,请参阅Amazon SQS FIFO 队列密钥条款。
// Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Set the message group ID message.setStringProperty("JMSXGroupID", "Default"); // You can also set a custom message deduplication ID // message.setStringProperty("JMS_SQS_DeduplicationId", "hello"); // Here, it's not needed because content-based deduplication is enabled for the queue // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID()); System.out.println("JMS Message Sequence Number " + message.getStringProperty("JMS_SQS_SequenceNumber"));
-
同步接收消息
-
要接收消息,可为同一队列创建一个使用者,然后调用
start
方法。您可以随时对连接调用
start
方法。不过,使用者不会开始接收消息,直至调用此方法。// Create a consumer for the 'MyQueue' MessageConsumer consumer = session.createConsumer(queue); // Start receiving incoming messages connection.start();
-
在超时设为 1 秒钟的情况下对该使用者调用
receive
方法,然后输出收到的消息内容。-
收到来自标准队列的消息后,可以访问消息的内容。
// Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); }
-
在收到来自 FIFO 队列的消息后,您可以访问消息的内容和其他特定于 FIFO 的消息属性,例如消息组 ID、消息重复数据删除 ID 和序列号。有关更多信息,请参阅 Amazon SQS FIFO 队列密钥条款。
// Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); System.out.println("Group id: " + receivedMessage.getStringProperty("JMSXGroupID")); System.out.println("Message deduplication id: " + receivedMessage.getStringProperty("JMS_SQS_DeduplicationId")); System.out.println("Message sequence number: " + receivedMessage.getStringProperty("JMS_SQS_SequenceNumber")); }
-
-
关闭连接和会话。
// Close the connection (and the session). connection.close();
输出看上去类似于以下内容:
JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!
注意
您可以使用 Spring Framework 初始化这些对象。
有关其他信息,请参阅 SpringExampleConfiguration.xml
、SpringExample.java
,以及ExampleConfiguration.java
部分中的 ExampleCommon.java
和 实际可用的 Java 示例:将 JMS 与 Amazon SQS 标准队列相结合使用 中的其他帮助程序类。
有关发送和接收对象的完整示例,请参阅 TextMessageSender.java 和 SyncMessageReceiver.java。
异步接收消息
在 使用 Amazon SQS Java Messaging Library中的示例中,消息发送到 MyQueue
并被同步接收。
以下示例介绍如何通过监听器异步接收消息。
-
实施
MessageListener
接口。class MyListener implements MessageListener { @Override public void onMessage(Message message) { try { // Cast the received message as TextMessage and print the text to screen. System.out.println("Received: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
在收到消息时,调用
onMessage
接口的MessageListener
方法。在此监听器实现中,输出保存在消息中的文本。 -
对
receive
实现实例设置使用者消息监听器,而不是对使用者显式调用MyListener
方法。主线程会等待一秒钟。// Create a consumer for the 'MyQueue'. MessageConsumer consumer = session.createConsumer(queue); // Instantiate and set the message listener for the consumer. consumer.setMessageListener(new MyListener()); // Start receiving incoming messages. connection.start(); // Wait for 1 second. The listener onMessage() method is invoked when a message is received. Thread.sleep(1000);
其余步骤与 使用 Amazon SQS Java Messaging Library示例中的步骤相同。有关异步使用者的完整示例,请参阅AsyncMessageReceiver.java
中的 实际可用的 Java 示例:将 JMS 与 Amazon SQS 标准队列相结合使用。
此示例的输出将与以下内容类似:
JMS Message ID:8example-588b-44e5-bbcf-d816example2
Received: Hello World!
使用客户端确认模式
使用 Amazon SQS Java Messaging Library中的示例使用 AUTO_ACKNOWLEDGE
模式,该模式会自动确认收到的每条消息(因此会从基础 Amazon SQS 队列中删除消息)。
-
要在消息处理完毕后显式确认消息,则必须创建具有
CLIENT_ACKNOWLEDGE
模式的会话。// Create the non-transacted session with CLIENT_ACKNOWLEDGE mode. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
收到消息时,显示消息,然后显式确认消息。
// Cast the received message as TextMessage and print the text to screen. Also acknowledge the message. if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); receivedMessage.acknowledge(); System.out.println("Acknowledged: " + message.getJMSMessageID()); }
注意
在此模式下,当确认某条消息时,也会隐式确认在该消息之前收到的所有消息。例如,如果收到 10 条消息,则仅确认第 10 条消息(按接收消息的顺序),然后还会确认先前的所有 9 条消息。
其余步骤与 使用 Amazon SQS Java Messaging Library示例中的步骤相同。有关具有客户端确认模式的同步使用者的完整示例,请参阅SyncMessageReceiverClientAcknowledge.java
中的 实际可用的 Java 示例:将 JMS 与 Amazon SQS 标准队列相结合使用。
此示例的输出将与以下内容类似:
JMS Message ID:4example-aa0e-403f-b6df-5e02example5
Received: Hello World!
Acknowledged: ID:4example-aa0e-403f-b6df-5e02example5
使用无序确认模式
在使用 CLIENT_ACKNOWLEDGE
模式时,将自动确认在显式确认的消息之前收到的所有消息。有关更多信息,请参阅 使用客户端确认模式。
Amazon SQS Java Messaging Library 提供另一种确认模式。在使用 UNORDERED_ACKNOWLEDGE
模式时,客户端必须单独显式确认收到的所有消息,不管消息的接收顺序如何。为此,请使用 UNORDERED_ACKNOWLEDGE
模式创建会话。
// Create the non-transacted session with UNORDERED_ACKNOWLEDGE mode. Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
其余步骤与 使用客户端确认模式示例中的步骤相同。有关具有 UNORDERED_ACKNOWLEDGE
模式的同步使用者的完整示例,请参阅 SyncMessageReceiverUnorderedAcknowledge.java
。
此示例中的输出将与以下内容类似:
JMS Message ID:dexample-73ad-4adb-bc6c-4357example7
Received: Hello World!
Acknowledged: ID:dexample-73ad-4adb-bc6c-4357example7