Amazon Simple Queue Service
开发人员指南
AWS 服务或AWS文档中描述的功能,可能因地区/位置而异。点 击 Getting Started with Amazon AWS to see specific differences applicable to the China (Beijing) Region.

使用 Amazon S3 管理大 Amazon SQS 消息;

您可以使用 Amazon S3 管理 Amazon SQS 消息。这在存储和使用大小高达 2 GB 的消息时特别有用。要使用 Amazon S3 管理 Amazon SQS 消息,请使用适用于 Java 的 Amazon SQS 扩展客户端库。具体来说,您可使用该库执行以下操作:

  • 指定消息是始终存储在 Amazon S3 中还是仅在其大小超过 256 KB 时存储在 中。

  • 发送引用存储在 Amazon S3 存储桶中的单个消息对象的消息。

  • 从 Amazon S3 存储桶中获取相应的消息对象。

  • 从 Amazon S3 存储桶中删除相应的消息对象。

注意

您可以使用适用于 Java 的 Amazon SQS 扩展客户端库 以通过 Amazon S3 管理 Amazon SQS 消息。但您无法通过 AWS CLI、Amazon SQS 控制台、Amazon SQS HTTP API 或除 适用于 Java 的开发工具包 之外的任何 AWS SDK 执行此操作。

先决条件

要使用 Amazon S3 管理 Amazon SQS 消息,您需要:

  • AWS SDK for Java – 可通过两种不同的方式将 适用于 Java 的开发工具包 包含在您的项目中。您可以下载并安装 适用于 Java 的开发工具包,或者,如果您使用 Maven 获取适用于 Java 的 Amazon SQS 扩展客户端库,则可以包含 适用于 Java 的开发工具包 作为依赖项。适用于 Java 的开发工具包 和适用于 Java 的 Amazon SQS 扩展客户端库需要 J2SE Development Kit 7.0 或更高版本。有关下载 适用于 Java 的开发工具包 的信息,请参阅 适用于 Java 的开发工具包。有关使用 Maven 的更多信息,请参阅此列表后面的注释。

  • 适用于 Java 的 Amazon SQS 扩展客户端库 - 如果您不使用 Maven,则必须将包文件 amazon-sqs-java-extended-client-lib.jar 添加到 Java 生成类路径。有关下载库的信息,请参阅适用于 Java 的 Amazon SQS 扩展客户端库

  • Amazon S3 存储桶 – 您必须创建新的 Amazon S3 存储桶或使用现有的存储桶来存储消息。建议您创建新的存储桶来实现此目的。要控制存储桶空间和 AWS 账户产生的费用,您还应在存储桶上设置生命周期配置规则,以便在消息对象创建日期之后的一段特定时间后永久删除该对象。有关说明,请参阅管理生命周期配置或本节后面的示例

注意

适用于 Java 的 Amazon SQS 扩展客户端库包含对 Maven 的支持,如下所示:

Copy
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-sqs-java-extended-client-lib</artifactId> <version>1.0.1</version> </dependency>

使用适用于 Java 的 Amazon SQS 扩展客户端库

在满足先决条件后,使用以下 Java 代码示例开始通过 Amazon S3 管理 Amazon SQS 消息。

该示例创建具有随机名称的 Amazon S3 存储桶并添加生命周期规则以便在创建日期之后的 14 天后永久删除该存储桶。然后,该示例会创建一个队列并向该队列发送 256 KB 以上的随机消息。此消息将存储在 Amazon S3 存储桶中。然后,该示例会使用此消息并打印有关使用的消息的信息。然后,该示例会删除该消息、队列和存储桶。

Copy
import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.UUID; import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClient; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.BucketLifecycleConfiguration; import com.amazonaws.services.s3.model.ListVersionsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3VersionSummary; import com.amazonaws.services.s3.model.VersionListing; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.DeleteQueueRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; public class SQSExtendedClientExample { private static final String s3BucketName = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { AWSCredentials credentials = null; try { credentials = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load the AWS credentials from the expected AWS credential profiles file. " + "Make sure that your credentials file is at the correct " + "location (/home/$USER/.aws/credentials) and is in a valid format.", e); } AmazonS3 s3 = new AmazonS3Client(credentials); Region s3Region = Region.getRegion(Regions.US_WEST_2); s3.setRegion(s3Region); // Set the Amazon S3 bucket name, and set a lifecycle rule on the bucket to // permanently delete objects a certain number of days after // each object's creation date. // Then create the bucket, and enable message objects to be stored in the bucket. BucketLifecycleConfiguration.Rule expirationRule = new BucketLifecycleConfiguration.Rule(); expirationRule.withExpirationInDays(14).withStatus("Enabled"); BucketLifecycleConfiguration lifecycleConfig = new BucketLifecycleConfiguration().withRules(expirationRule); s3.createBucket(s3BucketName); s3.setBucketLifecycleConfiguration(s3BucketName, lifecycleConfig); System.out.println("Bucket created and configured."); // Set the SQS extended client configuration with large payload support enabled. ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, s3BucketName); AmazonSQS sqsExtended = new AmazonSQSExtendedClient(new AmazonSQSClient(credentials), extendedClientConfig); Region sqsRegion = Region.getRegion(Regions.US_WEST_2); sqsExtended.setRegion(sqsRegion); // Create a long string of characters for the message object to be stored in the bucket. int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); String myLongString = new String(chars); // Create a message queue for this example. String QueueName = "QueueName" + UUID.randomUUID().toString(); CreateQueueRequest createQueueRequest = new CreateQueueRequest(QueueName); String myQueueUrl = sqsExtended.createQueue(createQueueRequest).getQueueUrl(); System.out.println("Queue created."); // Send the message. SendMessageRequest myMessageRequest = new SendMessageRequest(myQueueUrl, myLongString); sqsExtended.sendMessage(myMessageRequest); System.out.println("Sent the message."); // Receive messages, and then print general information about them. ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqsExtended.receiveMessage(receiveMessageRequest).getMessages(); for (Message message : messages) { System.out.println("\nMessage received:"); System.out.println(" ID: " + message.getMessageId()); System.out.println(" Receipt handle: " + message.getReceiptHandle()); System.out.println(" Message body (first 5 characters): " + message.getBody().substring(0, 5)); } // Delete the message, the queue, and the bucket. String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqsExtended.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); System.out.println("Deleted the message."); sqsExtended.deleteQueue(new DeleteQueueRequest(myQueueUrl)); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(AmazonS3 client) { ObjectListing objectListing = client.listObjects(s3BucketName); while (true) { for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) { S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next(); client.deleteObject(s3BucketName, objectSummary.getKey()); } if (objectListing.isTruncated()) { objectListing = client.listNextBatchOfObjects(objectListing); } else { break; } }; VersionListing list = client.listVersions(new ListVersionsRequest().withBucketName(s3BucketName)); for (Iterator<?> iterator = list.getVersionSummaries().iterator(); iterator.hasNext(); ) { S3VersionSummary s = (S3VersionSummary) iterator.next(); client.deleteVersion(s3BucketName, s.getKey(), s.getVersionId()); } client.deleteBucket(s3BucketName); } }