使用 Amazon SQS 管理大 Amazon S3 消息 - Amazon Simple Queue Service
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon SQS 管理大 Amazon S3 消息

管理大型 Amazon Simple Queue Service (Amazon SQS)消息,您可以使用 Amazon Simple Storage Service (Amazon S3)和 适用于 Java 的 Amazon SQS 扩展客户端库. 这对存储和使用高达2GB的消息特别有用。除非您的应用程序需要重复创建队列并使其处于非活动状态或在您的队列中存储大量数据,否则请考虑使用 Amazon S3 用于存储数据。

您可以使用 适用于 Java 的 Amazon SQS 扩展客户端库 来执行以下操作:

  • 指定消息是始终存储在 Amazon S3 中还是仅在其大小超过 256 KB 时存储在 中。

  • 发送引用存储在S3bucket中的单个消息对象的消息

  • 从S3存储桶检索消息对象

  • 从S3bucket中删除消息对象

您可以使用 适用于 Java 的 Amazon SQS 扩展客户端库 管理 Amazon SQS 消息使用 Amazon S3 与 AWS SDK for Java. 您不能使用 AWS CLI, Amazon SQS 控制台, Amazon SQS HTTPAPI,或任何 其他 AWS SDK。

适用于 Java 的开发工具包 和适用于 Java 的 Amazon SQS 扩展客户端库需要 J2SE Development Kit 8.0 或更高版本。

Prerequisites

以下示例使用的是 AWS Java 开发工具包。要安装和设置开发工具包,请参阅 AWS SDK for Java Developer Guide 中的设置适用于 Java 的 AWS 开发工具包

运行示例代码之前,请配置您的 AWS 凭证。有关更多信息,请参阅 AWS SDK for Java Developer Guide 中的设置用于开发的 AWS 凭证和区域

示例:使用 Amazon S3 管理大型 Amazon SQS 消息

以下示例创建 Amazon S3 具有随机名称的bucket并添加生命周期规则,以便在14天后永久删除对象。它还创建一个名为 MyQueue 和向队列发送存储在S3bucket中的随机消息。最后,代码检索消息,返回有关消息的信息,然后删除消息、队列和存储桶。

/* * Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * * http://www.amazonaws.cn/apache2.0 * * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. * */ import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.*; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.*; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import java.util.Arrays; import java.util.List; import java.util.UUID; public class SQSExtendedClientExample { // Create an Amazon S3 bucket with a random name. private final static String S3_BUCKET_NAME = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); public static void main(String[] args) { /* * Create a new instance of the builder with all defaults (credentials * and region) set automatically. For more information, see * Creating Service Clients in the AWS SDK for Java Developer Guide. */ final AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient(); /* * Set the Amazon S3 bucket name, and then set a lifecycle rule on the * bucket to permanently delete objects 14 days after each object's * creation date. */ final BucketLifecycleConfiguration.Rule expirationRule = new BucketLifecycleConfiguration.Rule(); expirationRule.withExpirationInDays(14).withStatus("Enabled"); final BucketLifecycleConfiguration lifecycleConfig = new BucketLifecycleConfiguration().withRules(expirationRule); // Create the bucket and allow message objects to be stored in the bucket. s3.createBucket(S3_BUCKET_NAME); s3.setBucketLifecycleConfiguration(S3_BUCKET_NAME, lifecycleConfig); System.out.println("Bucket created and configured."); /* * Set the Amazon SQS extended client configuration with large payload * support enabled. */ final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withLargePayloadSupportEnabled(s3, S3_BUCKET_NAME); final AmazonSQS sqsExtended = new AmazonSQSExtendedClient(AmazonSQSClientBuilder .defaultClient(), extendedClientConfig); /* * Create a long string of characters for the message object which will * be stored in the bucket. */ int stringLength = 300000; char[] chars = new char[stringLength]; Arrays.fill(chars, 'x'); final String myLongString = new String(chars); // Create a message queue for this example. final String QueueName = "MyQueue" + UUID.randomUUID().toString(); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(QueueName); final String myQueueUrl = sqsExtended .createQueue(createQueueRequest).getQueueUrl(); System.out.println("Queue created."); // Send the message. final SendMessageRequest myMessageRequest = new SendMessageRequest(myQueueUrl, myLongString); sqsExtended.sendMessage(myMessageRequest); System.out.println("Sent the message."); // Receive the message. final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl); List<Message> messages = sqsExtended .receiveMessage(receiveMessageRequest).getMessages(); // Print information about the message. for (Message message : messages) { System.out.println("\nMessage received."); System.out.println(" ID: " + message.getMessageId()); System.out.println(" Receipt handle: " + message.getReceiptHandle()); System.out.println(" Message body (first 5 characters): " + message.getBody().substring(0, 5)); } // Delete the message, the queue, and the bucket. final String messageReceiptHandle = messages.get(0).getReceiptHandle(); sqsExtended.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle)); System.out.println("Deleted the message."); sqsExtended.deleteQueue(new DeleteQueueRequest(myQueueUrl)); System.out.println("Deleted the queue."); deleteBucketAndAllContents(s3); System.out.println("Deleted the bucket."); } private static void deleteBucketAndAllContents(AmazonS3 client) { ObjectListing objectListing = client.listObjects(S3_BUCKET_NAME); while (true) { for (S3ObjectSummary objectSummary : objectListing .getObjectSummaries()) { client.deleteObject(S3_BUCKET_NAME, objectSummary.getKey()); } if (objectListing.isTruncated()) { objectListing = client.listNextBatchOfObjects(objectListing); } else { break; } } final VersionListing list = client.listVersions( new ListVersionsRequest().withBucketName(S3_BUCKET_NAME)); for (S3VersionSummary s : list.getVersionSummaries()) { client.deleteVersion(S3_BUCKET_NAME, s.getKey(), s.getVersionId()); } client.deleteBucket(S3_BUCKET_NAME); } }