通过 Amazon SNS 和 Amazon S3 发布大型消息 - Amazon Simple Notification Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

通过 Amazon SNS 和 Amazon S3 发布大型消息

要发布大型 Amazon SNS 消息,您可以使用适用于 Java 的 Amazon SNS 扩展客户端库。对于大于当前最大值 256KB(最大为 2GB)的消息,此库非常有用。该库将实际负载保存到 Amazon S3 存储桶,并将存储的 Amazon S3 对象的引用发布到主题。订阅的 Amazon SQS 队列可以使用适用于 Java 的 Amazon SQS 扩展客户端库从 Amazon S3 中取消引用并检索负载。其他终端节点(如 Lambda)可以使用 Amazon 的负载卸载 Java 公共库来取消引用并检索负载。

Prerequisites

以下是使用适用于 Java 的 Amazon SNS 扩展客户端库的先决条件:

配置消息存储

Amazon SNS 扩展负载库在 Amazon 的负载卸载 Java 公共库上使用,以进行消息存储和检索。您可以配置以下 Amazon S3 消息存储选项

  • 自定义消息大小阈值 – 具有超过此大小的负载和属性的消息将自动存储在 Amazon S3 中。

  • alwaysThroughS3 标志 – 将此值设置为 true 以强制将所有消息负载存储在 Amazon S3 中。例如:

    SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME).withAlwaysThroughS3(true);
  • 自定义 KMS 密钥 – 用于 Amazon S3 存储桶中的服务器端加密的密钥。

  • 存储桶名称 – 用于存储消息负载的 Amazon S3 存储桶的名称。

示例:使用存储在 Amazon S3 中的负载将消息发布到 Amazon SNS

下面介绍如何执行以下操作的示例:

  • 创建示例主题和队列。

  • 订阅队列以接收来自主题的消息。

  • 发布测试消息。

消息负载存储在 Amazon S3,以及发布到的引用中。Amazon SQS 扩展客户端用于接收消息。

SDK for Java 1.x

要发布大型消息,请使用适用于 Java 的 Amazon SNS 扩展客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest; import com.amazonaws.services.sns.util.Topics; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Example { public static void main(String[] args) { final String BUCKET_NAME = "extended-client-bucket"; final String TOPIC_NAME = "extended-client-topic"; final String QUEUE_NAME = "extended-client-queue"; final Regions region = Regions.DEFAULT_REGION; //Message threshold controls the maximum message size that will be allowed to be published //through SNS using the extended client. Payload of messages exceeding this value will be stored in //S3. The default value of this parameter is 256 KB which is the maximum message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; //Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); //Create bucket, topic, queue and subscription s3Client.createBucket(BUCKET_NAME); final String topicArn = snsClient.createTopic( new CreateTopicRequest().withName(TOPIC_NAME) ).getTopicArn(); final String queueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName(QUEUE_NAME) ).getQueueUrl(); final String subscriptionArn = Topics.subscribeQueue( snsClient, sqsClient, topicArn, queueUrl ); //To read message content stored in S3 transparently through SQS extended client, //set the RawMessageDelivery subscription attribute to TRUE final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest(); subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn); subscriptionAttributesRequest.setAttributeName("RawMessageDelivery"); subscriptionAttributesRequest.setAttributeValue("TRUE"); snsClient.setSubscriptionAttributes(subscriptionAttributesRequest); //Initialize SNS extended client //PayloadSizeThreshold triggers message content storage in S3 when the threshold is exceeded //To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); //Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); //Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); //Read the message from the queue final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl); System.out.println("Received message is " + result.getMessages().get(0).getBody()); } }
  • GitHub 中查找说明和更多代码。

其他终端节点协议

Amazon SNS 和 Amazon SQS 库都使用 Amazon 的负载卸载 Java 公共库通过 Amazon S3 存储和检索消息负载。任何启用 Java 的终端节点(例如,在 Java 中实施的 HTTPS 终端节点)都可以使用相同的库来取消引用消息内容。

无法使用 Amazon 的负载卸载 Java 公共库的终端节点仍可以通过存储在 Amazon S3 中的负载发布消息。以下是由上面的代码示例发布的 Amazon S3 引用的示例:

[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]