Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

使用 Amazon S3 管理大 Amazon SQS 消息;

您可以使用 Amazon S3 管理 Amazon SQS 消息。这在存储和检索大小高达 2 GB 的消息时特别有用。要使用 Amazon S3 管理 Amazon SQS 消息,请使用适用于 Java 的 Amazon SQS 扩展客户端库。具体来说,您可使用该库执行以下操作:

  • 指定消息是始终存储在 Amazon S3 中还是仅在其大小超过 256 KB 时存储在 中。

  • 发送引用存储在 Amazon S3 存储桶中的单个消息对象的消息。

  • 从 Amazon S3 存储桶中获取相应的消息对象。

  • 从 Amazon S3 存储桶中删除相应的消息对象。

注意

您可以使用适用于 Java 的 Amazon SQS 扩展客户端库来通过 Amazon S3 管理 Amazon SQS 消息。但您无法通过 AWS CLI、Amazon SQS 控制台、Amazon SQS HTTP API 或除 适用于 Java 的开发工具包 之外的任何 AWS SDK 执行此操作。

适用于 Java 的 Amazon SQS 扩展客户端库当前不支持 FIFO 队列。

先决条件

要使用 Amazon S3 管理 Amazon SQS 消息,您需要:

  • AWS SDK for Java – 可通过两种不同的方式将 适用于 Java 的开发工具包 包含在您的项目中。您可以下载并安装 适用于 Java 的开发工具包,或者,如果您使用 Maven 获取适用于 Java 的 Amazon SQS 扩展客户端库,则可以包含 适用于 Java 的开发工具包 作为依赖项。适用于 Java 的开发工具包 和适用于 Java 的 Amazon SQS 扩展客户端库要求有 J2SE Development Kit 7.0 或更高版本。有关下载 适用于 Java 的开发工具包 的信息,请参阅 适用于 Java 的开发工具包。有关使用 Maven 的更多信息,请参阅此列表后面的注释。

  • 适用于 Java 的 Amazon SQS 扩展客户端库 – 如果您不使用 Maven,则必须将包文件 amazon-sqs-java-extended-client-lib.jar 添加到 Java 生成类路径。有关下载的信息,请参阅适用于 Java 的 Amazon SQS 扩展客户端库

  • Amazon S3 存储桶 – 您必须创建新的 Amazon S3 存储桶或使用现有的存储桶来存储消息。建议您创建新的存储桶来实现此目的。要控制存储桶空间和 AWS 账户产生的费用,您还应在存储桶上设置生命周期配置规则,以便在消息对象创建日期之后的一段特定时间后永久删除该对象。有关说明,请参阅管理生命周期配置或本节后面的示例

注意

适用于 Java 的 Amazon SQS 扩展客户端库包含对 Maven 的支持,如下所示:

Copy
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-extended-client-lib</artifactId> <version>1.0.0</version> </dependency>

使用适用于 Java 的 Amazon SQS 扩展客户端库

在满足先决条件后,使用以下 Java 代码示例开始通过 Amazon S3 管理 Amazon SQS 消息。

该示例创建具有随机名称的 Amazon S3 存储桶并添加生命周期规则以便在创建日期之后的 14 天后永久删除该存储桶。然后,该示例会创建一个队列并向该队列发送 256 KB 以上的随机消息。此消息将存储在 Amazon S3 存储桶中。然后,该示例会检索此消息并打印有关检索到的消息的信息。然后,该示例会删除该消息、队列和存储桶。

Copy
import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.UUID; import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.BucketLifecycleConfiguration; import com.amazonaws.services.s3.model.ListVersionsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.VersionListing; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.DeleteQueueRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; public class SQSExtendedClientExample { private static final String s3BucketName = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { AWSCredentials credentials = null; try { credentials = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load the AWS credentials from the expected AWS credential profiles file. " + "Make sure that your credentials file is at the correct " + "location (/home/$USER/.aws/credentials) and is in a valid format.", e); } AmazonS3 s3 = new AmazonS3Client(credentials); Region s3Region = Region.getRegion(Regions.US_WEST_2); s3.setRegion(s3Region); // Set the Amazon S3 bucket name, and set a lifecycle rule on the bucket to // permanently delete objects a certain number of days after // each object's creation date. // Then create the bucket, and enable message objects to be stored in the bucket. BucketLifecycleConfiguration.Rule expirationRule = new BucketLifecycleConfiguration.Rule(); expirationRule.withExpirationInDays(14).withStatus("Enabled"); BucketLifecycleConfiguration lifecycleConfig = new BucketLifecycleConfiguration().withRules(expirationRule); s3.createBucket(s3BucketName); s3.setBucketLifecycleConfiguration(s3BucketName, lifecycleConfig); System.out.println("Bucket created and configured."); // Set the SQS extended client configuration with large payload support enabled. ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, s3BucketName); AmazonSQS sqsExtended = new AmazonSQSExtendedClient(new AmazonSQSClient(credentials), extendedClientConfig); Region sqsRegion = Region.getRegion(Regions.US_WEST_2); sqsExtended.setRegion(sqsRegion); // Create a long string of characters for the message object to be stored in the bucket. int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); String myLongString = new String(chars); // Create a message queue for this example. String QueueName = "QueueName" + UUID.randomUUID().toString(); CreateQueueRequest createQueueRequest = new CreateQueueRequest(QueueName); String myQueueUrl = sqsExtended.createQueue(createQueueRequest).getQueueUrl(); System.out.println("Queue created."); // Send the message. SendMessageRequest myMessageRequest = new SendMessageRequest(myQueueUrl, myLongString); sqsExtended.sendMessage(myMessageRequest); System.out.println("Sent the message."); // Receive messages, and then print general information about them. ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqsExtended.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { System.out.println("\nMessage received:"); System.out.println(" ID: " + message.getMessageId()); System.out.println(" Receipt handle: " + message.getReceiptHandle()); System.out.println(" Message body (first 5 characters): " + message.getBody().substring(0, 5)); } // Delete the message, the queue, and the bucket. String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqsExtended.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); System.out.println("Deleted the message."); sqsExtended.deleteQueue(new DeleteQueueRequest(myQueueUrl)); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(AmazonS3 client) { ObjectListing objectListing = client.listObjects(s3BucketName); while (true) { for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) { S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next(); client.deleteObject(s3BucketName, objectSummary.getKey()); } if (objectListing.isTruncated()) { objectListing = client.listNextBatchOfObjects(objectListing); } else { break; } }; VersionListing list = client.listVersions(new ListVersionsRequest().withBucketName(s3BucketName)); for (Iterator<?> iterator = list.getVersionSummaries().iterator(); iterator.hasNext(); ) { S3VersionSummary s = (S3VersionSummary) iterator.next(); client.deleteVersion(s3BucketName, s.getKey(), s.getVersionId()); } client.deleteBucket(s3BucketName); } }