Amazon SQS Java Messaging Library入门 - Amazon Simple Queue Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon SQS Java Messaging Library入门

开始使用JavaMessageService(JMS) Amazon SQS,请使用本节中的代码示例。以下部分介绍如何创建 JMS 连接和会话以及如何发送和接收消息。

Amazon SQS Java Messaging Library 中包装提供的 Amazon SQS 客户端对象还会检查是否存在 Amazon SQS 队列。如果该队列不存在,客户端将创建它。

创建 JMS 连接

  1. 创建连接工厂并对该工厂调用 createConnection 方法。

    // Create a new connection factory with all defaults (credentials and region) set automatically SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.defaultClient() ); // Create the connection. SQSConnection connection = connectionFactory.createConnection();

    SQSConnection 类扩展 javax.jms.Connection。结合JMS标准连接方法, SQSConnection 提供其他方法,例如 getAmazonSQSClientgetWrappedAmazonSQSClient。两种方法都允许您执行JMS规范中未包含的管理操作,例如创建新队列。不过,getWrappedAmazonSQSClient 方法还提供当前连接使用的 Amazon SQS 客户端的包装版本。该包装程序将客户端中的每个异常转变为 JMSException,从而让预期有 JMSException 发生的现有代码更方便地使用该异常。

  2. 可以使用从 getAmazonSQSClientgetWrappedAmazonSQSClient 返回的客户端对象来执行未包含在 JMS 规范中的管理操作(例如,可以创建 Amazon SQS 队列)。

    如果您现有的代码需要 JMS 异常,则应使用 getWrappedAmazonSQSClient

    • 如果您使用 getWrappedAmazonSQSClient,则返回的客户端对象会将所有异常转变成 JMS 异常。

    • 如果使用 getAmazonSQSClient,则这些异常将全部为 Amazon SQS 异常。

创建 Amazon SQS 队列

包装的客户端对象将检查是否存在 Amazon SQS 队列。

如果队列不存在,客户端将创建它。如果队列存在,则该功能不会返回任何值。有关更多信息,请参阅 TextMessageSender.java 示例中的“根据需要创建队列”一节。

创建标准队列

// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an SQS queue named MyQueue, if it doesn't already exist if (!client.queueExists("MyQueue")) { client.createQueue("MyQueue"); }

创建FIFO队列

// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an Amazon SQS FIFO queue named MyQueue.fifo, if it doesn't already exist if (!client.queueExists("MyQueue.fifo")) { Map<String, String> attributes = new HashMap<String, String>(); attributes.put("FifoQueue", "true"); attributes.put("ContentBasedDeduplication", "true"); client.createQueue(new CreateQueueRequest().withQueueName("MyQueue.fifo").withAttributes(attributes)); }
注意

FIFO 队列的名称必须以 .fifo 后缀结尾。

有关 ContentBasedDeduplication 属性的更多信息,请参阅 确切一次处理

同步发送消息

  1. 当连接和基础 Amazon SQS 队列准备就绪时,将创建一个具有 AUTO_ACKNOWLEDGE 模式的非事务性 JMS 会话。

    // Create the nontransacted session with AUTO_ACKNOWLEDGE mode Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  2. 为了向队列发送文本消息,将创建一个 JMS 队列标识和消息创建者。

    // Create a queue identity and specify the queue name to the session Queue queue = session.createQueue("MyQueue"); // Create a producer for the 'MyQueue' MessageProducer producer = session.createProducer(queue);
  3. 创建文本消息并将它发送到队列。

    • 要将消息发送到标准队列,您无需设置任何其他参数。

      // Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID());
    • 要将消息发送到FIFO队列,您必须设置消息组ID。您也可以设置消息重复数据删除 ID。有关更多信息,请参阅 关键术语。)

      // Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Set the message group ID message.setStringProperty("JMSXGroupID", "Default"); // You can also set a custom message deduplication ID // message.setStringProperty("JMS_SQS_DeduplicationId", "hello"); // Here, it's not needed because content-based deduplication is enabled for the queue // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID()); System.out.println("JMS Message Sequence Number " + message.getStringProperty("JMS_SQS_SequenceNumber"));

同步接收消息

  1. 要接收消息,可为同一队列创建一个使用器,然后调用 start 方法。

    您可以随时对连接调用 start 方法。不过,使用者不会开始接收消息,直至调用此方法。

    // Create a consumer for the 'MyQueue' MessageConsumer consumer = session.createConsumer(queue); // Start receiving incoming messages connection.start();
  2. 在超时设为 1 秒钟的情况下对该使用器调用 receive 方法,然后输出收到的消息内容。

    • 从标准队列接收消息后,您可以访问消息的内容。

      // Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); }
    • 从FIFO队列收到报文后,您可以访问报文内容以及其他FIFO特定报文属性,如报文组ID、报文重删ID、序列号等。有关更多信息,请参阅 关键术语。)

      // Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); System.out.println("Group id: " + receivedMessage.getStringProperty("JMSXGroupID")); System.out.println("Message deduplication id: " + receivedMessage.getStringProperty("JMS_SQS_DeduplicationId")); System.out.println("Message sequence number: " + receivedMessage.getStringProperty("JMS_SQS_SequenceNumber")); }
  3. 关闭连接和会话。

    // Close the connection (and the session). connection.close();

输出看上去类似于以下内容:

JMS Message ID:8example-588b-44e5-bbcf-d816example2 Received: Hello World!
注意

您可以使用 Spring Framework 初始化这些对象。

有关其他信息,请参阅 SpringExampleConfiguration.xmlSpringExample.java,以及ExampleConfiguration.java部分中的 ExampleCommon.java使用JMS的Java示例 Amazon SQS 标准队列 中的其他帮助程序类。

有关发送和接收对象的完整示例,请参阅 TextMessageSender.javaSyncMessageReceiver.java

异步接收消息

Amazon SQS Java Messaging Library入门中的示例中,消息发送到 MyQueue 并被同步接收。

以下示例介绍如何通过监听器异步接收消息。

  1. 实施 MessageListener 接口。

    class MyListener implements MessageListener { @Override public void onMessage(Message message) { try { // Cast the received message as TextMessage and print the text to screen. System.out.println("Received: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }

    在收到消息时,调用 onMessage 接口的 MessageListener 方法。在此监听器实现中,输出保存在消息中的文本。

  2. receive 实现实例设置使用者消息监听器,而不是对使用者显式调用 MyListener 方法。主线程会等待一秒钟。

    // Create a consumer for the 'MyQueue'. MessageConsumer consumer = session.createConsumer(queue); // Instantiate and set the message listener for the consumer. consumer.setMessageListener(new MyListener()); // Start receiving incoming messages. connection.start(); // Wait for 1 second. The listener onMessage() method is invoked when a message is received. Thread.sleep(1000);

其余步骤与 Amazon SQS Java Messaging Library入门示例中的步骤相同。有关异步使用者的完整示例,请参阅AsyncMessageReceiver.java中的 使用JMS的Java示例 Amazon SQS 标准队列

此示例的输出将与以下内容类似:

JMS Message ID:8example-588b-44e5-bbcf-d816example2 Received: Hello World!

使用客户端确认模式

Amazon SQS Java Messaging Library入门中的示例使用 AUTO_ACKNOWLEDGE 模式,该模式会自动确认收到的每条消息(因此会从 Amazon SQS 队列中删除消息)。

  1. 要在消息处理完毕后显式确认消息,则必须创建具有 CLIENT_ACKNOWLEDGE 模式的会话。

    // Create the non-transacted session with CLIENT_ACKNOWLEDGE mode. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  2. 收到消息时,显示消息,然后显式确认消息。

    // Cast the received message as TextMessage and print the text to screen. Also acknowledge the message. if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); receivedMessage.acknowledge(); System.out.println("Acknowledged: " + message.getJMSMessageID()); }
    注意

    在此模式下,当确认某条消息时,也会隐式确认在该消息之前收到的所有消息。例如,如果收到 10 条消息,则仅确认第 10 条消息 (按接收消息的顺序),然后还会确认先前的所有 9 条消息。

其余步骤与 Amazon SQS Java Messaging Library入门示例中的步骤相同。有关具有客户端确认模式的同步使用者的完整示例,请参阅SyncMessageReceiverClientAcknowledge.java中的 使用JMS的Java示例 Amazon SQS 标准队列

此示例的输出将与以下内容类似:

JMS Message ID:4example-aa0e-403f-b6df-5e02example5 Received: Hello World! Acknowledged: ID:4example-aa0e-403f-b6df-5e02example5

使用无序确认模式

在使用 CLIENT_ACKNOWLEDGE 模式时,将自动确认在显式确认的消息之前收到的所有消息。有关更多信息,请参阅 使用客户端确认模式。)

Amazon SQS Java Messaging Library 提供另一种确认模式。在使用 UNORDERED_ACKNOWLEDGE 模式时,客户端必须单独显式确认收到的所有消息,不管消息的接收顺序如何。为此,请使用 UNORDERED_ACKNOWLEDGE 模式创建会话。

// Create the non-transacted session with UNORDERED_ACKNOWLEDGE mode. Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

其余步骤与 使用客户端确认模式示例中的步骤相同。有关具有 UNORDERED_ACKNOWLEDGE 模式的同步使用者的完整示例,请参阅 SyncMessageReceiverUnorderedAcknowledge.java

此示例中的输出将与以下内容类似:

JMS Message ID:dexample-73ad-4adb-bc6c-4357example7 Received: Hello World! Acknowledged: ID:dexample-73ad-4adb-bc6c-4357example7