Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。请点击 Amazon AWS 入门,可查看中国地区的具体差异

代码示例

以下代码示例说明如何将 JMS 与 Amazon SQS 标准队列结合使用。有关使用 FIFO 队列的更多信息,请参阅创建 FIFO 队列同步发送消息同步接收消息。(同步接收消息同时适用于标准队列和 FIFO 队列。但是,FIFO 队列中的消息包含更多属性。)

ExampleConfiguration.java

以下 Java 代码示例设置将用于其他 Java 示例的默认队列名称、区域和凭证。

public class ExampleConfiguration { public static final String DEFAULT_QUEUE_NAME = "SQSJMSClientExampleQueue"; public static final Region DEFAULT_REGION = Region.getRegion(Regions.US_EAST_2); private static String getParameter( String args[], int i ) { if( i + 1 >= args.length ) { throw new IllegalArgumentException( "Missing parameter for " + args[i] ); } return args[i+1]; } /** * Parse the command line and return the resulting config. If the config parsing fails * print the error and the usage message and then call System.exit * * @param app the app to use when printing the usage string * @param args the command line arguments * @return the parsed config */ public static ExampleConfiguration parseConfig(String app, String args[]) { try { return new ExampleConfiguration(args); } catch (IllegalArgumentException e) { System.err.println( "ERROR: " + e.getMessage() ); System.err.println(); System.err.println( "Usage: " + app + " [--queue <queue>] [--region <region>] [--credentials <credentials>] "); System.err.println( " or" ); System.err.println( " " + app + " <spring.xml>" ); System.exit(-1); return null; } } private ExampleConfiguration(String args[]) { for( int i = 0; i < args.length; ++i ) { String arg = args[i]; if( arg.equals( "--queue" ) ) { setQueueName(getParameter(args, i)); i++; } else if( arg.equals( "--region" ) ) { String regionName = getParameter(args, i); try { setRegion(Region.getRegion(Regions.fromName(regionName))); } catch( IllegalArgumentException e ) { throw new IllegalArgumentException( "Unrecognized region " + regionName ); } i++; } else if( arg.equals( "--credentials" ) ) { String credsFile = getParameter(args, i); try { setCredentialsProvider( new PropertiesFileCredentialsProvider(credsFile) ); } catch (AmazonClientException e) { throw new IllegalArgumentException("Error reading credentials from " + credsFile, e ); } i++; } else { throw new IllegalArgumentException("Unrecognized option " + arg); } } } private String queueName = DEFAULT_QUEUE_NAME; private Region region = DEFAULT_REGION; private AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public Region getRegion() { return region; } public void setRegion(Region region) { this.region = region; } public AWSCredentialsProvider getCredentialsProvider() { return credentialsProvider; } public void setCredentialsProvider(AWSCredentialsProvider credentialsProvider) { // Make sure they're usable first credentialsProvider.getCredentials(); this.credentialsProvider = credentialsProvider; } }

TextMessageSender.java

以下 Java 代码示例创建文本消息创建者。

public class TextMessageSender { public static void main(String args[]) throws JMSException { ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args); ExampleCommon.setupLogging(); // Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion(config.getRegion().getName()) .withCredentials(config.getCredentialsProvider()) ); // Create the connection SQSConnection connection = connectionFactory.createConnection(); // Create the queue if needed ExampleCommon.ensureQueueExists(connection, config.getQueueName()); // Create the session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) ); sendMessages(session, producer); // Close the connection. This closes the session automatically connection.close(); System.out.println( "Connection closed" ); } private static void sendMessages( Session session, MessageProducer producer ) { BufferedReader inputReader = new BufferedReader( new InputStreamReader( System.in, Charset.defaultCharset() ) ); try { String input; while( true ) { System.out.print( "Enter message to send (leave empty to exit): " ); input = inputReader.readLine(); if( input == null || input.equals("" ) ) break; TextMessage message = session.createTextMessage(input); producer.send(message); System.out.println( "Send message " + message.getJMSMessageID() ); } } catch (EOFException e) { // Just return on EOF } catch (IOException e) { System.err.println( "Failed reading input: " + e.getMessage() ); } catch (JMSException e) { System.err.println( "Failed sending message: " + e.getMessage() ); e.printStackTrace(); } } }

SyncMessageReceiver.java

以下 Java 代码示例创建同步消息使用者。

public class SyncMessageReceiver { public static void main(String args[]) throws JMSException { ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args); ExampleCommon.setupLogging(); // Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion(config.getRegion().getName()) .withCredentials(config.getCredentialsProvider()) ); // Create the connection SQSConnection connection = connectionFactory.createConnection(); // Create the queue if needed ExampleCommon.ensureQueueExists(connection, config.getQueueName()); // Create the session Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) ); connection.start(); receiveMessages(session, consumer); // Close the connection. This closes the session automatically connection.close(); System.out.println( "Connection closed" ); } private static void receiveMessages( Session session, MessageConsumer consumer ) { try { while( true ) { System.out.println( "Waiting for messages"); // Wait 1 minute for a message Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1)); if( message == null ) { System.out.println( "Shutting down after 1 minute of silence" ); break; } ExampleCommon.handleMessage(message); message.acknowledge(); System.out.println( "Acknowledged message " + message.getJMSMessageID() ); } } catch (JMSException e) { System.err.println( "Error receiving from SQS: " + e.getMessage() ); e.printStackTrace(); } } }

AsyncMessageReceiver.java

以下 Java 代码示例创建异步消息使用者。

public class AsyncMessageReceiver { public static void main(String args[]) throws JMSException, InterruptedException { ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args); ExampleCommon.setupLogging(); // Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion(config.getRegion().getName()) .withCredentials(config.getCredentialsProvider()) ); // Create the connection SQSConnection connection = connectionFactory.createConnection(); // Create the queue if needed ExampleCommon.ensureQueueExists(connection, config.getQueueName()); // Create the session Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) ); ReceiverCallback callback = new ReceiverCallback(); consumer.setMessageListener( callback ); // No messages are processed until this is called connection.start(); callback.waitForOneMinuteOfSilence(); System.out.println( "Returning after one minute of silence" ); // Close the connection. This closes the session automatically connection.close(); System.out.println( "Connection closed" ); } private static class ReceiverCallback implements MessageListener { // Used to listen for message silence private volatile long timeOfLastMessage = System.nanoTime(); public void waitForOneMinuteOfSilence() throws InterruptedException { for(;;) { long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage; long remainingTillOneMinuteOfSilence = TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage; if( remainingTillOneMinuteOfSilence < 0 ) { break; } TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence); } } @Override public void onMessage(Message message) { try { ExampleCommon.handleMessage(message); message.acknowledge(); System.out.println( "Acknowledged message " + message.getJMSMessageID() ); timeOfLastMessage = System.nanoTime(); } catch (JMSException e) { System.err.println( "Error processing message: " + e.getMessage() ); e.printStackTrace(); } } } }

SyncMessageReceiverClientAcknowledge.java

以下 Java 代码示例创建具有客户端确认模式的同步使用者。

/** * An example class to demonstrate the behavior of CLIENT_ACKNOWLEDGE mode for received messages. This example * complements the example given in {@link SyncMessageReceiverUnorderedAcknowledge} for UNORDERED_ACKNOWLEDGE mode. * * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to * receive another message is made. It's shown that no message is returned for this attempt since in CLIENT_ACKNOWLEDGE mode, * as expected, all the messages prior to the acknowledged messages are also acknowledged. * * This ISN'T the behavior for UNORDERED_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverUnorderedAcknowledge} * for an example. */ public class SyncMessageReceiverClientAcknowledge { // Visibility time-out for the queue. It must match to the one set for the queue for this example to work. private static final long TIME_OUT_SECONDS = 1; public static void main(String args[]) throws JMSException, InterruptedException { // Create the configuration for the example ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverClientAcknowledge", args); // Setup logging for the example ExampleCommon.setupLogging(); // Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion(config.getRegion().getName()) .withCredentials(config.getCredentialsProvider()) ); // Create the connection SQSConnection connection = connectionFactory.createConnection(); // Create the queue if needed ExampleCommon.ensureQueueExists(connection, config.getQueueName()); // Create the session with client acknowledge mode Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Create the producer and consume MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName())); MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName())); // Open the connection connection.start(); // Send two text messages sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it receiveMessage(consumer, false); // Receive another message and acknowledge it receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue System.out.println("Waiting for visibility timeout..."); Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS)); // Attempt to receive another message and acknowledge it. This results in receiving no messages since // we have acknowledged the second message. Although we did not explicitly acknowledge the first message, // in the CLIENT_ACKNOWLEDGE mode, all the messages received prior to the explicitly acknowledged message // are also acknowledged. Therefore, we have implicitly acknowledged the first message. receiveMessage(consumer, true); // Close the connection. This closes the session automatically connection.close(); System.out.println("Connection closed."); } /** * Sends a message through the producer. * * @param producer Message producer * @param session Session * @param messageText Text for the message to be sent * @throws JMSException */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it producer.send(session.createTextMessage(messageText)); } /** * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS). * If a message is received, the message is printed. If no message is received, "Queue is empty!" is * printed. * * @param consumer Message consumer * @param acknowledge If true and a message is received, the received message is acknowledged. * @throws JMSException */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS)); if (message == null) { System.out.println("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text System.out.println("Received: " + ((TextMessage) message).getText()); // Acknowledge the message if asked if (acknowledge) message.acknowledge(); } } }

SyncMessageReceiverUnorderedAcknowledge.java

以下 Java 代码示例创建具有无序确认模式的同步使用者。

/** * An example class to demonstrate the behavior of UNORDERED_ACKNOWLEDGE mode for received messages. This example * complements the example given in {@link SyncMessageReceiverClientAcknowledge} for CLIENT_ACKNOWLEDGE mode. * * First, a session, a message producer, and a message consumer are created. Then, two messages are sent. Next, two messages * are received but only the second one is acknowledged. After waiting for the visibility time out period, an attempt to * receive another message is made. It's shown that the first message received in the prior attempt is returned again * for the second attempt. In UNORDERED_ACKNOWLEDGE mode, all the messages must be explicitly acknowledged no matter what * the order they're received. * * This ISN'T the behavior for CLIENT_ACKNOWLEDGE mode. Please see {@link SyncMessageReceiverClientAcknowledge} * for an example. */ public class SyncMessageReceiverUnorderedAcknowledge { // Visibility time-out for the queue. It must match to the one set for the queue for this example to work. private static final long TIME_OUT_SECONDS = 1; public static void main(String args[]) throws JMSException, InterruptedException { // Create the configuration for the example ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiverUnorderedAcknowledge", args); // Setup logging for the example ExampleCommon.setupLogging(); // Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), AmazonSQSClientBuilder.standard() .withRegion(config.getRegion().getName()) .withCredentials(config.getCredentialsProvider()) ); // Create the connection SQSConnection connection = connectionFactory.createConnection(); // Create the queue if needed ExampleCommon.ensureQueueExists(connection, config.getQueueName()); // Create the session with unordered acknowledge mode Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); // Create the producer and consume MessageProducer producer = session.createProducer(session.createQueue(config.getQueueName())); MessageConsumer consumer = session.createConsumer(session.createQueue(config.getQueueName())); // Open the connection connection.start(); // Send two text messages sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it receiveMessage(consumer, false); // Receive another message and acknowledge it receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue System.out.println("Waiting for visibility timeout..."); Thread.sleep(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS)); // Attempt to receive another message and acknowledge it. This results in receiving the first message since // we have acknowledged only the second message. In the UNORDERED_ACKNOWLEDGE mode, all the messages must // be explicitly acknowledged. receiveMessage(consumer, true); // Close the connection. This closes the session automatically connection.close(); System.out.println("Connection closed."); } /** * Sends a message through the producer. * * @param producer Message producer * @param session Session * @param messageText Text for the message to be sent * @throws JMSException */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it producer.send(session.createTextMessage(messageText)); } /** * Receives a message through the consumer synchronously with the default timeout (TIME_OUT_SECONDS). * If a message is received, the message is printed. If no message is received, "Queue is empty!" is * printed. * * @param consumer Message consumer * @param acknowledge If true and a message is received, the received message is acknowledged. * @throws JMSException */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message Message message = consumer.receive(TimeUnit.SECONDS.toMillis(TIME_OUT_SECONDS)); if (message == null) { System.out.println("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text System.out.println("Received: " + ((TextMessage) message).getText()); // Acknowledge the message if asked if (acknowledge) message.acknowledge(); } } }

SpringExampleConfiguration.xml

以下 XML 代码示例是 SpringExample.java 的 Bean 配置文件。

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd "> <bean id="CredentialsProviderBean" class="com.amazonaws.auth.DefaultAWSCredentialsProviderChain"/> <bean id="ClientBuilder" class="com.amazonaws.services.sqs.AmazonSQSClientBuilder" factory-method="standard"> <property name="region" value="us-east-2"/> <property name="credentials" ref="CredentialsProviderBean"/> </bean> <bean id="ProviderConfiguration" class="com.amazon.sqs.javamessaging.ProviderConfiguration"> <property name="numberOfMessagesToPrefetch" value="5"/> </bean> <bean id="ConnectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"> <constructor-arg ref="ProviderConfiguration" /> <constructor-arg ref="ClientBuilder" /> </bean> <bean id="Connection" class="javax.jms.Connection" factory-bean="ConnectionFactory" factory-method="createConnection" init-method="start" destroy-method="close" /> <bean id="QueueName" class="java.lang.String"> <constructor-arg value="SQSJMSClientExampleQueue"/> </bean> </beans>

SpringExample.java

以下 Java 代码示例使用 Bean 配置文件来初始化您的对象。

public class SpringExample { public static void main(String args[]) throws JMSException { if( args.length != 1 || !args[0].endsWith(".xml")) { System.err.println( "Usage: " + SpringExample.class.getName() + " <spring config.xml>" ); System.exit(1); } File springFile = new File( args[0] ); if( !springFile.exists() || !springFile.canRead() ) { System.err.println( "File " + args[0] + " does not exist or isn't readable."); System.exit(2); } ExampleCommon.setupLogging(); FileSystemXmlApplicationContext context = new FileSystemXmlApplicationContext( "file://" + springFile.getAbsolutePath() ); Connection connection; try { connection = context.getBean(Connection.class); } catch( NoSuchBeanDefinitionException e ) { System.err.println( "Can't find the JMS connection to use: " + e.getMessage() ); System.exit(3); return; } String queueName; try { queueName = context.getBean("QueueName", String.class); } catch( NoSuchBeanDefinitionException e ) { System.err.println( "Can't find the name of the queue to use: " + e.getMessage() ); System.exit(3); return; } if( connection instanceof SQSConnection ) { ExampleCommon.ensureQueueExists( (SQSConnection) connection, queueName ); } // Create the session Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer( session.createQueue( queueName) ); receiveMessages(session, consumer); // The context can be setup to close the connection for us context.close(); System.out.println( "Context closed" ); } private static void receiveMessages( Session session, MessageConsumer consumer ) { try { while( true ) { System.out.println( "Waiting for messages"); // Wait 1 minute for a message Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1)); if( message == null ) { System.out.println( "Shutting down after 1 minute of silence" ); break; } ExampleCommon.handleMessage(message); message.acknowledge(); System.out.println( "Acknowledged message" ); } } catch (JMSException e) { System.err.println( "Error receiving from SQS: " + e.getMessage() ); e.printStackTrace(); } } }

ExampleCommon.java

以下 Java 代码示例首先查看 Amazon SQS 队列是否存在,如果不存在,则创建一个。它还包含示例日历记录代码。

public class ExampleCommon { /** * A utility function to check the queue exists and create it if needed. For most * use cases this is usually done by an administrator before the application is run. */ public static void ensureQueueExists(SQSConnection connection, String queueName) throws JMSException { AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); /** * In most cases, you can do this with just a createQueue call, but GetQueueUrl * (called by queueExists) is a faster operation for the common case where the queue * already exists. Also many users and roles have permission to call GetQueueUrl * but do not have permission to call CreateQueue. */ if( !client.queueExists(queueName) ) { client.createQueue( queueName ); } } public static void setupLogging() { // Setup logging BasicConfigurator.configure(); Logger.getRootLogger().setLevel(Level.WARN); } public static void handleMessage(Message message) throws JMSException { System.out.println( "Got message " + message.getJMSMessageID() ); System.out.println( "Content: "); if( message instanceof TextMessage ) { TextMessage txtMessage = ( TextMessage ) message; System.out.println( "\t" + txtMessage.getText() ); } else if( message instanceof BytesMessage ){ BytesMessage byteMessage = ( BytesMessage ) message; // Assume the length fits in an int - SQS only supports sizes up to 256k so that // should be true byte[] bytes = new byte[(int)byteMessage.getBodyLength()]; byteMessage.readBytes(bytes); System.out.println( "\t" + Base64.encodeAsString( bytes ) ); } else if( message instanceof ObjectMessage ) { ObjectMessage objMessage = (ObjectMessage) message; System.out.println( "\t" + objMessage.getObject() ); } } }