通过 Amazon SNS 和 Amazon S3 发布大型消息 - Amazon Simple Notification Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

通过 Amazon SNS 和 Amazon S3 发布大型消息

要发布大型 Amazon SNS 消息,您可以使用适用于 Java 的 Amazon SNS 扩展客户端库。对于大于当前最大值 256 KB(最大为 2 GB)的邮件,此库非常有用。该库将实际负载保存到 Amazon S3 存储桶,并将存储的 Amazon S3 对象的引用发布到主题。订阅的 Amazon SQS 队列可以使用适用于 Java 的 Amazon SQS 扩展客户端库从 Amazon S3 中取消引用并检索有效负载。其他端点(如 Lambda)可以使用有效负载卸载Amazon来取消引用并检索有效载荷。

Prerequisites

以下是使用适用于 Java 的 Amazon SNS 扩展客户端库的先决条件:

  • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的Amazon开发工具包。

    本页上的示例使用AmazonJava 开发工具包。要安装和设置开发工具包,请参阅。设置Amazon适用于 Java 的开发工具包中的Amazon SDK for Java开发人员指南

  • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 Amazon Web Services 账户 使用适当的凭据。

    创建 Amazon Web Services 账户 中,导航到Amazon主页,然后选择创建Amazon账户。按照说明进行操作。

    有关凭证的信息,请参阅。设置Amazon凭证和发展区域中的Amazon SDK for Java开发人员指南

  • Java 8 或更高版本。

  • 适用于 Java 的 Amazon SNS 扩展客户端库(也可用Maven)。

配置消息存储

Amazon SNS 扩展负载库在有效负载卸载 Java 公用库上使用Amazon用于消息存储和检索。您可以配置以下 Amazon S3消息存储选项

  • 自定义消息大小阈值 — 具有超过此大小的有效负载和属性的邮件将自动存储在 Amazon S3 中。

  • alwaysThroughS3标志 — 将此值设置为true强制将所有消息负载存储在 Amazon S3 中。例如:

    SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME).withAlwaysThroughS3(true);
  • 自定义 KMS 密钥 — 用于 Amazon S3 存储桶中服务器端加密的密钥。

  • 存储桶名称 — 用于存储消息负载的 Amazon S3 存储桶的名称。

例如:将消息发布到 Amazon SNS,并存储在 Amazon S3 中的负载存储

下面介绍了如何执行以下操作的示例:

  • 创建示例主题和队列。

  • 为队列订阅以接收来自主题的消息。

  • 发布测试消息。

消息负载存储在 Amazon S3 中,并发布对该负载的引用。Amazon SQS 扩展客户端用于接收消息。

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest; import com.amazonaws.services.sns.util.Topics; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Example { public static void main(String[] args) { final String BUCKET_NAME = "extended-client-bucket"; final String TOPIC_NAME = "extended-client-topic"; final String QUEUE_NAME = "extended-client-queue"; final Regions region = Regions.DEFAULT_REGION; //Message threshold controls the maximum message size that will be allowed to be published //through SNS using the extended client. Payload of messages exceeding this value will be stored in //S3. The default value of this parameter is 256 KB which is the maximum message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; //Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); //Create bucket, topic, queue and subscription s3Client.createBucket(BUCKET_NAME); final String topicArn = snsClient.createTopic( new CreateTopicRequest().withName(TOPIC_NAME) ).getTopicArn(); final String queueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName(QUEUE_NAME) ).getQueueUrl(); final String subscriptionArn = Topics.subscribeQueue( snsClient, sqsClient, topicArn, queueUrl ); //To read message content stored in S3 transparently through SQS extended client, //set the RawMessageDelivery subscription attribute to TRUE final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest(); subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn); subscriptionAttributesRequest.setAttributeName("RawMessageDelivery"); subscriptionAttributesRequest.setAttributeValue("TRUE"); snsClient.setSubscriptionAttributes(subscriptionAttributesRequest); //Initialize SNS extended client //PayloadSizeThreshold triggers message content storage in S3 when the threshold is exceeded //To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); //Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); //Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); //Read the message from the queue final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl); System.out.println("Received message is " + result.getMessages().get(0).getBody()); } }

其他终端协议

Amazon SNS 和 Amazon SQS 库都使用有效负载卸载Amazon通过 Amazon S3 存储和检索消息有效负载。任何启用 Java 的终端节点(例如,在 Java 中实现的 HTTPS 终端节点)都可以使用相同的库来取消引用消息内容。

无法使用有效负载卸载 Java 公用库的终端节点Amazon仍可以发布存储在 Amazon S3 中的消息。以下是由上面的代码示例发布的 Amazon S3 引用示例:

[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]