Amazon SQS Java Messaging Library 入门 - Amazon Simple Queue Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon SQS Java Messaging Library 入门

要开始结合使用 Java Message Service (JMS) 与 Amazon SQS,请使用本节中的代码示例。以下部分介绍如何创建 JMS 连接和会话以及如何发送和接收消息。

Amazon SQS Java Messaging Library 中包含了封装的 Amazon SQS 客户端对象,该对象会检查是否存在 Amazon SQS 队列。如果队列不存在,客户端将创建它。

创建 JMS 连接

  1. 创建连接工厂并对该工厂调用 createConnection 方法。

    // Create a new connection factory with all defaults (credentials and region) set automatically SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.defaultClient() ); // Create the connection. SQSConnection connection = connectionFactory.createConnection();

    SQSConnection 类对 javax.jms.Connection 进行了扩展。除了 JMS 标准连接方法,SQSConnection 还提供其他一些方法,例如 getAmazonSQSClientgetWrappedAmazonSQSClient。这两种方法均可让您执行未包含在 JMS 规范中的管理操作,例如创建新队列。不过,getWrappedAmazonSQSClient 方法还提供当前连接使用的 Amazon SQS 客户端的封装版本。该包装程序将客户端中的每个异常转变为 JMSException,从而让预期有 JMSException 发生的现有代码更方便地使用该异常。

  2. 您可以使用从 getAmazonSQSClientgetWrappedAmazonSQSClient 返回的客户端对象来执行未包含在 JMS 规范中的管理操作(例如,可以创建 Amazon SQS 队列)。

    如果您现有的代码需要 JMS 异常,则应使用 getWrappedAmazonSQSClient

    • 如果您使用 getWrappedAmazonSQSClient,则返回的客户端对象会将所有异常转变成 JMS 异常。

    • 如果您使用 getAmazonSQSClient,则这些异常将全部为 Amazon SQS 异常。

创建 Amazon SQS 队列

包装的客户端对象将检查是否存在 Amazon SQS 队列。

如果队列不存在,客户端将创建它。如果队列存在,则该功能不会返回任何值。有关更多信息,请参阅 TextMessageSender.java 示例中的“根据需要创建队列”一节。

创建标准队列

// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an SQS queue named MyQueue, if it doesn't already exist if (!client.queueExists("MyQueue")) { client.createQueue("MyQueue"); }

创建 FIFO 队列

// Get the wrapped client AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); // Create an Amazon SQS FIFO queue named MyQueue.fifo, if it doesn't already exist if (!client.queueExists("MyQueue.fifo")) { Map<String, String> attributes = new HashMap<String, String>(); attributes.put("FifoQueue", "true"); attributes.put("ContentBasedDeduplication", "true"); client.createQueue(new CreateQueueRequest().withQueueName("MyQueue.fifo").withAttributes(attributes)); }
注意

FIFO 队列名称必须以 .fifo 后缀结尾。

有关 ContentBasedDeduplication 属性的更多信息,请参阅仅处理一次

同步发送消息

  1. 当连接和基础 Amazon SQS 队列准备就绪时,将创建一个具有 AUTO_ACKNOWLEDGE 模式的非事务性 JMS 会话。

    // Create the nontransacted session with AUTO_ACKNOWLEDGE mode Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  2. 为了向队列发送文本消息,将创建一个 JMS 队列标识和消息创建者。

    // Create a queue identity and specify the queue name to the session Queue queue = session.createQueue("MyQueue"); // Create a producer for the 'MyQueue' MessageProducer producer = session.createProducer(queue);
  3. 创建文本消息并将它发送到队列。

    • 要向标准队列发送消息,您无需设置任何其他参数。

      // Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID());
    • 要向 FIFO 队列发送消息,必须设置消息组 ID。您也可以设置消息重复数据删除 ID。有关更多信息,请参阅关键术语

      // Create the text message TextMessage message = session.createTextMessage("Hello World!"); // Set the message group ID message.setStringProperty("JMSXGroupID", "Default"); // You can also set a custom message deduplication ID // message.setStringProperty("JMS_SQS_DeduplicationId", "hello"); // Here, it's not needed because content-based deduplication is enabled for the queue // Send the message producer.send(message); System.out.println("JMS Message " + message.getJMSMessageID()); System.out.println("JMS Message Sequence Number " + message.getStringProperty("JMS_SQS_SequenceNumber"));

同步接收消息

  1. 要接收消息,可为同一队列创建一个使用者,然后调用 start 方法。

    您可以随时对连接调用 start 方法。不过,使用者不会开始接收消息,直至调用此方法。

    // Create a consumer for the 'MyQueue' MessageConsumer consumer = session.createConsumer(queue); // Start receiving incoming messages connection.start();
  2. 在超时设为 1 秒钟的情况下对该使用者调用 receive 方法,然后输出收到的消息内容。

    • 收到来自标准队列的消息后,可以访问消息的内容。

      // Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); }
    • 在收到来自 FIFO 队列的消息后,您可以访问消息的内容和其他特定于 FIFO 的消息属性,例如消息组 ID、消息重复数据删除 ID 和序列号。有关更多信息,请参阅关键术语

      // Receive a message from 'MyQueue' and wait up to 1 second Message receivedMessage = consumer.receive(1000); // Cast the received message as TextMessage and display the text if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); System.out.println("Group id: " + receivedMessage.getStringProperty("JMSXGroupID")); System.out.println("Message deduplication id: " + receivedMessage.getStringProperty("JMS_SQS_DeduplicationId")); System.out.println("Message sequence number: " + receivedMessage.getStringProperty("JMS_SQS_SequenceNumber")); }
  3. 关闭连接和会话。

    // Close the connection (and the session). connection.close();

输出看上去类似于以下内容:

JMS Message ID:8example-588b-44e5-bbcf-d816example2 Received: Hello World!
注意

您可以使用 Spring Framework 初始化这些对象。

有关其他信息,请参阅 SpringExampleConfiguration.xmlSpringExample.java,以及ExampleConfiguration.java部分中的 ExampleCommon.java实际可用的 Java 示例:结合使用 JMS 与 Amazon SQS 标准队列 中的其他帮助程序类。

有关发送和接收对象的完整示例,请参阅 TextMessageSender.javaSyncMessageReceiver.java

异步接收消息

Amazon SQS Java Messaging Library 入门中的示例中,消息发送到 MyQueue 并被同步接收。

以下示例介绍如何通过监听器异步接收消息。

  1. 实施 MessageListener 接口。

    class MyListener implements MessageListener { @Override public void onMessage(Message message) { try { // Cast the received message as TextMessage and print the text to screen. System.out.println("Received: " + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }

    在收到消息时,调用 onMessage 接口的 MessageListener 方法。在此监听器实现中,输出保存在消息中的文本。

  2. receive 实现实例设置使用者消息监听器,而不是对使用者显式调用 MyListener 方法。主线程会等待一秒钟。

    // Create a consumer for the 'MyQueue'. MessageConsumer consumer = session.createConsumer(queue); // Instantiate and set the message listener for the consumer. consumer.setMessageListener(new MyListener()); // Start receiving incoming messages. connection.start(); // Wait for 1 second. The listener onMessage() method is invoked when a message is received. Thread.sleep(1000);

其余步骤与 Amazon SQS Java Messaging Library 入门示例中的步骤相同。有关异步使用者的完整示例,请参阅AsyncMessageReceiver.java中的 实际可用的 Java 示例:结合使用 JMS 与 Amazon SQS 标准队列

此示例的输出将与以下内容类似:

JMS Message ID:8example-588b-44e5-bbcf-d816example2 Received: Hello World!

使用客户端确认模式

Amazon SQS Java Messaging Library 入门中的示例使用 AUTO_ACKNOWLEDGE 模式,该模式会自动确认收到的每条消息(因此会从基础 Amazon SQS 队列中删除消息)。

  1. 要在消息处理完毕后显式确认消息,则必须创建具有 CLIENT_ACKNOWLEDGE 模式的会话。

    // Create the non-transacted session with CLIENT_ACKNOWLEDGE mode. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
  2. 收到消息时,显示消息,然后显式确认消息。

    // Cast the received message as TextMessage and print the text to screen. Also acknowledge the message. if (receivedMessage != null) { System.out.println("Received: " + ((TextMessage) receivedMessage).getText()); receivedMessage.acknowledge(); System.out.println("Acknowledged: " + message.getJMSMessageID()); }
    注意

    在此模式下,当确认某条消息时,也会隐式确认在该消息之前收到的所有消息。例如,如果收到 10 条消息,则仅确认第 10 条消息(按接收消息的顺序),然后还会确认先前的所有 9 条消息。

其余步骤与 Amazon SQS Java Messaging Library 入门示例中的步骤相同。有关具有客户端确认模式的同步使用者的完整示例,请参阅SyncMessageReceiverClientAcknowledge.java中的 实际可用的 Java 示例:结合使用 JMS 与 Amazon SQS 标准队列

此示例的输出将与以下内容类似:

JMS Message ID:4example-aa0e-403f-b6df-5e02example5 Received: Hello World! Acknowledged: ID:4example-aa0e-403f-b6df-5e02example5

使用无序确认模式

在使用 CLIENT_ACKNOWLEDGE 模式时,将自动确认在显式确认的消息之前收到的所有消息。有关更多信息,请参阅使用客户端确认模式

Amazon SQS Java Messaging Library 提供另一种确认模式。在使用 UNORDERED_ACKNOWLEDGE 模式时,客户端必须单独显式确认收到的所有消息,不管消息的接收顺序如何。为此,请使用 UNORDERED_ACKNOWLEDGE 模式创建会话。

// Create the non-transacted session with UNORDERED_ACKNOWLEDGE mode. Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);

其余步骤与 使用客户端确认模式示例中的步骤相同。有关具有 UNORDERED_ACKNOWLEDGE 模式的同步使用者的完整示例,请参阅 SyncMessageReceiverUnorderedAcknowledge.java

此示例中的输出将与以下内容类似:

JMS Message ID:dexample-73ad-4adb-bc6c-4357example7 Received: Hello World! Acknowledged: ID:dexample-73ad-4adb-bc6c-4357example7