使用 Amazon S3 管理大型 Amazon SQS 消息 - Amazon Simple Queue Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon S3 管理大型 Amazon SQS 消息

要管理大型 Amazon Simple Queue Service (Amazon SQS) 消息,您可以使用 Amazon Simple Storage Service (Amazon S3) 和 Amazon SQS 适用于 Java 的 Amazon SQS) 和 Amazon SQS Java 的 Amazon SQS 扩展客户端库。这在存储和使用高达 2 GB 的消息时特别有用。除非您的应用程序要求反复创建队列,并将其保持不活动状态或在队列中存储大量数据,否则请考虑使用 Amazon S3 来存储您的数据。

您可以使用适用于 Java 的 Amazon SQS 扩展客户端库执行以下操作:

  • 指定消息是始终存储在 Amazon S3 中还是仅在其大小超过 256 KB 时存储在 Amazon S3 中

  • 发送引用 S3 存储桶中的单个消息对象的消息

  • 从 S3 存储桶检索消息对象

  • 从 S3 存储桶中删除该消息对象

您可以使用适用于 Java 的 Amazon SQS 扩展客户端库使用 Amazon S3 管理 Amazon SQS 消息仅限使用Amazon SDK for Java. 您无法使用Amazon CLI、Amazon SQS 控制台、Amazon SQS HTTP API 或任何其他 Amazon开发工具包。

这些区域有:适用于 Java 的开发工具包和适用于 Java 的 Amazon SQS 扩展客户端库要求使用 J2SE 开发工具包 8.0 或更高版本。

Prerequisites

以下示例使用的是 Amazon Java 开发工具包。要安装并设置开发工具包,请参阅设置Amazon适用于 Java 的开发工具包中的Amazon SDK for Java开发人员指南.

运行示例代码之前,请配置您的 Amazon 凭证。有关更多信息,请参阅 。设置Amazon发展证书和区域中的Amazon SDK for Java开发人员指南.

示例:使用 Amazon S3 管理大型 Amazon SQS 消息

以下示例创建具有随机名称的 Amazon S3 存储桶并添加生命周期规则以便在创建日期之后的 14 天后永久删除这些对象。它还会创建一个名为MyQueue并向该队列发送存储在 S3 存储桶中并且大于 256 KB 的随机消息。最后,代码检索该消息,返回有关该消息的信息,然后删除该消息、队列和存储桶。

/* * Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * https://aws.amazon.com/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.*; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import java.util.Arrays; import java.util.List; import java.util.UUID; public class SQSExtendedClientExample { // Create an Amazon S3 bucket with a random name. private final static String S3_BUCKET_NAME = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see * Creating Service Clients in the Amazon SDK for Java Developer Guide. */ final AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient(); /* * Set the Amazon S3 bucket name, and then set a lifecycle rule on the * bucket to permanently delete objects 14 days after each object's * creation date. */ final BucketLifecycleConfiguration.Rule expirationRule = new BucketLifecycleConfiguration.Rule(); expirationRule.withExpirationInDays(14).withStatus("Enabled"); final BucketLifecycleConfiguration lifecycleConfig = new BucketLifecycleConfiguration().withRules(expirationRule); // Create the bucket and allow message objects to be stored in the bucket. s3.createBucket(S3_BUCKET_NAME); s3.setBucketLifecycleConfiguration(S3_BUCKET_NAME, lifecycleConfig); System.out.println("Bucket created and configured."); /* * Set the Amazon SQS extended client configuration with large payload * support enabled. */ final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, S3_BUCKET_NAME); final AmazonSQS sqsExtended = new AmazonSQSExtendedClient(AmazonSQSClientBuilder .defaultClient(), extendedClientConfig); /* * Create a long string of characters for the message object which will * be stored in the bucket. */ int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); final String myLongString = new String(chars); // Create a message queue for this example. final String QueueName = "MyQueue" + UUID.randomUUID().toString(); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(QueueName); final String myQueueUrl = sqsExtended .createQueue(createQueueRequest).getQueueUrl(); System.out.println("Queue created."); // Send the message. final SendMessageRequest myMessageRequest = new SendMessageRequest(myQueueUrl, myLongString); sqsExtended.sendMessage(myMessageRequest); System.out.println("Sent the message."); // Receive the message. final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqsExtended .receiveMessage(receiveMessageRequest).getMessages(); // Print information about the message. for (Message message : messages) { System.out.println("\nMessage received."); System.out.println(" ID: " + message.getMessageId()); System.out.println(" Receipt handle: " + message.getReceiptHandle()); System.out.println(" Message body (first 5 characters): " + message.getBody().substring(0, 5)); } // Delete the message, the queue, and the bucket. final String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqsExtended.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); System.out.println("Deleted the message."); sqsExtended.deleteQueue(new DeleteQueueRequest(myQueueUrl)); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(AmazonS3 client) { ObjectListing objectListing = client.listObjects(S3_BUCKET_NAME); while (true) { for (S3ObjectSummary objectSummary : objectListing .getObjectSummaries()) { client.deleteObject(S3_BUCKET_NAME, objectSummary.getKey()); } if (objectListing.isTruncated()) { objectListing = client.listNextBatchOfObjects(objectListing); } else { break; } } final VersionListing list = client.listVersions( new ListVersionsRequest().withBucketName(S3_BUCKET_NAME)); for (S3VersionSummary s : list.getVersionSummaries()) { client.deleteVersion(S3_BUCKET_NAME, s.getKey(), s.getVersionId()); } client.deleteBucket(S3_BUCKET_NAME); } }