

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过 Amazon SNS 和 Amazon S3 发布大型消息
大型消息负载

要发布很大的 Amazon SNS 消息，您可以使用[适用于 Java 的 Amazon SNS 扩展型客户端库](https://github.com/awslabs/amazon-sns-java-extended-client-lib/)或[适用于 Python 的 Amazon SNS 扩展型客户端库](https://github.com/awslabs/amazon-sns-python-extended-client-lib)。对于大于当前最大值 256KB（最大为 2GB）的消息，这些库非常有用。这两个库将实际有效负载保存到 Amazon S3 桶，并将存储的 Amazon S3 对象的引用发布到 Amazon SNS 主题。订阅的 Amazon SQS 队列可以使用[适用于 Java 的 Amazon SQS 扩展客户端库](https://github.com/awslabs/amazon-sqs-java-extended-client-lib)从 Amazon S3 中取消引用并检索负载。其他端点（如 Lambda）可以使用 [Amazon的有效负载卸载 Java 公共库](https://github.com/awslabs/payload-offloading-java-common-lib-for-aws)来取消引用并检索有效负载。

**注意**  
Amazon SNS 扩展型客户端库与标准主题和 FIFO 主题都兼容。

# 适用于 Java 的 Amazon SNS 扩展型客户端库


## 先决条件


以下是使用[适用于 Java 的 Amazon SNS 扩展型客户端库](https://github.com/awslabs/amazon-sns-java-extended-client-lib)的先决条件：
+ 一个 Amazon 软件开发工具包。本页上的示例使用 Amazon Java 开发工具包。要安装和设置 SDK，请参阅《*适用于 Java 的 Amazon SDK 开发者指南*[》中的 “设置 Amazon 适用于 Java 的 SDK](https://docs.amazonaws.cn/sdk-for-java/latest/developer-guide/setup-install.html)”。
+ 并 Amazon Web Services 账户 具有正确的凭据。要创建 Amazon Web Services 账户，请导航到[Amazon 主页](https://www.amazonaws.cn/)，然后选择**创建 Amazon 帐户**。按照说明进行操作。

  有关证书的信息，请参阅《*适用于 Java 的 Amazon SDK 开发人员指南》中的 “设置用于开发*[的 Amazon 凭证和区域](https://docs.amazonaws.cn/sdk-for-java/latest/developer-guide/setup-credentials.html)”。
+ Java 8 或更高版本。
+ 适用于 Java 的 Amazon SNS 扩展型客户端库（也可从 [Maven](https://maven.apache.org/) 中获得）。

## 配置消息存储


Amazon SNS 扩展客户端库使用负载卸载 Java 公共库 Amazon 进行消息存储和检索。您可以配置以下 Amazon S3 [消息存储选项](https://github.com/awslabs/amazon-sns-java-extended-client-lib/blob/main/src/main/java/software/amazon/sns/SNSExtendedClientConfiguration.java)：
+ **自定义消息大小阈值** – 具有超过此大小的有效载荷和属性的消息将自动存储在 Amazon S3 中。
+ **`alwaysThroughS3` 标志** – 将此值设置为 `true` 以强制将所有消息有效载荷存储在 Amazon S3 中。例如：

  ```
  SNSExtendedClientConfiguration snsExtendedClientConfiguration = new
  SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME).withAlwaysThroughS3(true);
  ```
+ **自定义 KMS 密钥** – 用于 Amazon S3 存储桶中的服务器端加密的密钥。
+ **存储桶名称** – 用于存储消息有效载荷的 Amazon S3 存储桶的名称。

## 示例：使用存储在 Amazon S3 中的负载将消息发布到 Amazon SNS


以下代码示例展示了如何：
+ 创建示例主题和队列。
+ 订阅队列以接收来自主题的消息。
+ 发布测试消息。

消息负载存储在 Amazon S3，以及发布到的引用中。Amazon SQS 扩展客户端用于接收消息。

**适用于 Java 的 SDK 1.x**  
 还有更多相关信息 GitHub。在 [Amazon 代码示例存储库](https://github.com/awsdocs/aws-doc-sdk-examples/tree/main/java/example_code/sns#code-examples)中查找完整示例，了解如何进行设置和运行。
要发布大型消息，请使用适用于 Java 的 Amazon SNS 扩展客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。  

```
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest;
import com.amazonaws.services.sns.util.Topics;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;

public class Example {

        public static void main(String[] args) {
                final String BUCKET_NAME = "extended-client-bucket";
                final String TOPIC_NAME = "extended-client-topic";
                final String QUEUE_NAME = "extended-client-queue";
                final Regions region = Regions.DEFAULT_REGION;

                // Message threshold controls the maximum message size that will be allowed to
                // be published
                // through SNS using the extended client. Payload of messages exceeding this
                // value will be stored in
                // S3. The default value of this parameter is 256 KB which is the maximum
                // message size in SNS (and SQS).
                final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32;

                // Initialize SNS, SQS and S3 clients
                final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build();
                final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build();
                final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build();

                // Create bucket, topic, queue and subscription
                s3Client.createBucket(BUCKET_NAME);
                final String topicArn = snsClient.createTopic(
                                new CreateTopicRequest().withName(TOPIC_NAME)).getTopicArn();
                final String queueUrl = sqsClient.createQueue(
                                new CreateQueueRequest().withQueueName(QUEUE_NAME)).getQueueUrl();
                final String subscriptionArn = Topics.subscribeQueue(
                                snsClient, sqsClient, topicArn, queueUrl);

                // To read message content stored in S3 transparently through SQS extended
                // client,
                // set the RawMessageDelivery subscription attribute to TRUE
                final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest();
                subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn);
                subscriptionAttributesRequest.setAttributeName("RawMessageDelivery");
                subscriptionAttributesRequest.setAttributeValue("TRUE");
                snsClient.setSubscriptionAttributes(subscriptionAttributesRequest);

                // Initialize SNS extended client
                // PayloadSizeThreshold triggers message content storage in S3 when the
                // threshold is exceeded
                // To store all messages content in S3, use AlwaysThroughS3 flag
                final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
                                .withPayloadSupportEnabled(s3Client, BUCKET_NAME)
                                .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD);
                final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient,
                                snsExtendedClientConfiguration);

                // Publish message via SNS with storage in S3
                final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above.";
                snsExtendedClient.publish(topicArn, message);

                // Initialize SQS extended client
                final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration()
                                .withPayloadSupportEnabled(s3Client, BUCKET_NAME);
                final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient,
                                sqsExtendedClientConfiguration);

                // Read the message from the queue
                final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl);
                System.out.println("Received message is " + result.getMessages().get(0).getBody());
        }
}
```

## 其他终端节点协议


Amazon SNS 和 Amazon SQS 库都使用 [Amazon的负载卸载 Java 公共库](https://github.com/awslabs/payload-offloading-java-common-lib-for-aws)通过 Amazon S3 存储和检索消息负载。任何启用 Java 的终端节点（例如，在 Java 中实施的 HTTPS 终端节点）都可以使用相同的库来取消引用消息内容。

无法使用有效负载卸载 Java 公共库的终端节点仍然 Amazon 可以发布存储在 Amazon S3 中的有效负载的消息。以下是由上面的代码示例发布的 Amazon S3 引用的示例：

```
[
  "software.amazon.payloadoffloading.PayloadS3Pointer",
  {
    "s3BucketName": "extended-client-bucket",
    "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx"
  }
]
```

# 适用于 Python 的 Amazon SNS 扩展型客户端库


## 先决条件


以下是使用[适用于 Python 的 Amazon SNS 扩展型客户端库](https://github.com/awslabs/amazon-sns-python-extended-client-lib)的先决条件：
+ 一个 Amazon 软件开发工具包。本页上的示例使用 Amazon Python SDK Boto3。要安装和设置 SDK，请参阅 [https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html) 文档。
+ 并 Amazon Web Services 账户 具有正确的凭据。要创建 Amazon Web Services 账户，请导航到[Amazon 主页](https://www.amazonaws.cn/)，然后选择**创建 Amazon 帐户**。按照说明进行操作。

  有关凭证的信息，请参阅《Amazon SDK for Python 开发人员指南》**中的[凭证](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)。
+ Python 3.x（或更高版本）和 pip。
+ 适用于 Python 的 Amazon SNS 扩展型客户端库（也可从 [PyPI](https://pypi.org/project/amazon-sns-extended-client/) 中获得）。

## 配置消息存储


Boto3 Amazon [SN](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#client) S 客户端[、主题[PlatformEndpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/platformendpoint/index.html)和对象上提供了以下属性，](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/topic/index.html)用于配置 Amazon S3 消息存储选项。
+ **`large_payload_support`** – 用于存储大型消息的 Amazon S3 存储桶名称。
+ **`use_legacy_attribute`** – 如果设置为 `True`，则所有已发布的消息都使用旧版保留消息属性（`SQSLargePayloadSize`），而不是当前的保留消息属性（`ExtendedPayloadSize`）。
+ **`message_size_threshold`** – 在大型消息桶中存储消息的阈值。不能低于 `0` 或超过 `262144`。默认值为 `262144`。
+ **`always_through_s3`** – 如果为 `True`，则所有消息都存储在 Amazon S3 中。默认值为 `False`。
+ **`s3_client`** – 用于在 Amazon S3 中存储对象的 Boto3 Amazon S3 `client` 对象。如果您想要控制 Amazon S3 客户端（例如自定义 Amazon S3 配置或凭证），请使用此选项。首次使用时，若未预先设置，默认值为 `boto3.client("s3")`。

## 示例：使用存储在 Amazon S3 中的有效负载将消息发布到 Amazon SNS


以下代码示例展示了如何：
+ 创建示例 Amazon SNS 主题和 Amazon SQS 队列。
+ 为 Amazon SQS 队列附加策略以接收来自 Amazon SNS 主题的消息。
+ 订阅队列以接收来自主题的消息。
+ 使用 Amazon SNS 扩展客户端、主题资源和 PlatformEndpoint 资源发布测试消息。
+ 消息有效负载存储在 Amazon S3 中，并发布对它的引用。
+ 打印队列中已发布的消息以及从 Amazon S3 检索到的原始消息。

要发布大型消息，请使用适用于 Python 的 Amazon SNS 扩展型客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

```
import boto3
from sns_extended_client import SNSExtendedClientSession
from json import loads

s3_extended_payload_bucket = "extended-client-bucket-store"  # S3 bucket with the given bucket name is a resource which is created and accessible with the given AWS credentials
TOPIC_NAME = "---TOPIC-NAME---"
QUEUE_NAME = "---QUEUE-NAME---"

def allow_sns_to_write_to_sqs(topicarn, queuearn):
    policy_document = """{{
        "Version": "2012-10-17",		 	 	 
        "Statement":[
            {{
            "Sid":"MyPolicy",
            "Effect":"Allow",
            "Principal" : {{"AWS" : "*"}},
            "Action":"SQS:SendMessage",
            "Resource": "{}",
            "Condition":{{
                "ArnEquals":{{
                "aws:SourceArn": "{}"
                }}
            }}
            }}
        ]
        }}""".format(queuearn, topicarn)

    return policy_document

def get_msg_from_s3(body,sns_extended_client):
    """Handy Helper to fetch message from S3"""
    json_msg = loads(body)
    s3_object = sns_extended_client.s3_client.get_object(
        Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key")
    )
    msg = s3_object.get("Body").read().decode()
    return msg


def fetch_and_print_from_sqs(sqs, queue_url,sns_extended_client):
    sqs_msg = sqs.receive_message(
        QueueUrl=queue_url,
        AttributeNames=['All'],
        MessageAttributeNames=['All'],
        VisibilityTimeout=0,
        WaitTimeSeconds=0,
        MaxNumberOfMessages=1
    ).get("Messages")[0]
    
    message_body = sqs_msg.get("Body")
    print("Published Message: {}".format(message_body))
    print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body,sns_extended_client)))

    # Delete the Processed Message
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=sqs_msg['ReceiptHandle']
    )


sns_extended_client = boto3.client("sns", region_name="us-east-1")
create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME)
sns_topic_arn = create_topic_response.get("TopicArn")

# create and subscribe an sqs queue to the sns client
sqs = boto3.client("sqs",region_name="us-east-1")
demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl")
sqs_queue_arn = sqs.get_queue_attributes(
    QueueUrl=demo_queue_url, AttributeNames=["QueueArn"]
)["Attributes"].get("QueueArn")

# Adding policy to SQS queue such that SNS topic can send msg to SQS queue
policy_json = allow_sns_to_write_to_sqs(sns_topic_arn, sqs_queue_arn)
response = sqs.set_queue_attributes(
    QueueUrl = demo_queue_url,
    Attributes = {
        'Policy' : policy_json
    }
)

# Set the RawMessageDelivery subscription attribute to TRUE if you want to use
# SQSExtendedClient to help with retrieving msg from S3
sns_extended_client.subscribe(TopicArn=sns_topic_arn, Protocol="sqs", 
Endpoint=sqs_queue_arn
, Attributes={"RawMessageDelivery":"true"}
)

sns_extended_client.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of sns_extended_client to use 'us-east-1' region
sns_extended_client.s3_client = boto3.client("s3", region_name="us-east-1")


# Below is the example that all the messages will be sent to the S3 bucket
sns_extended_client.always_through_s3 = True
sns_extended_client.publish(
    TopicArn=sns_topic_arn, Message="This message should be published to S3"
)
print("\n\nPublished using SNS extended client:")
fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client)  # Prints message stored in s3

# Below is the example that all the messages larger than 32 bytes will be sent to the S3 bucket
print("\nUsing decreased message size threshold:")

sns_extended_client.always_through_s3 = False
sns_extended_client.message_size_threshold = 32
sns_extended_client.publish(
    TopicArn=sns_topic_arn,
    Message="This message should be published to S3 as it exceeds the limit of the 32 bytes",
)

fetch_and_print_from_sqs(sqs, demo_queue_url,sns_extended_client)  # Prints message stored in s3


# Below is the example to publish message using the SNS.Topic resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
    "sns", region_name="us-east-1"
)

topic = sns_extended_client_resource.Topic(sns_topic_arn)
topic.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of topic to use 'us-east-1' region
topic.s3_client = boto3.client("s3", region_name="us-east-1")

topic.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
topic.publish(
    Message="This message should be published to S3 using the topic resource",
    MessageAttributes={
        "S3Key": {
            "DataType": "String",
            "StringValue": "347c11c4-a22c-42e4-a6a2-9b5af5b76587",
        }
    },
)
print("\nPublished using Topic Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,topic)

# Below is the example to publish message using the SNS.PlatformEndpoint resource
sns_extended_client_resource = SNSExtendedClientSession().resource(
    "sns", region_name="us-east-1"
)

platform_endpoint = sns_extended_client_resource.PlatformEndpoint(sns_topic_arn)
platform_endpoint.large_payload_support = s3_extended_payload_bucket

# Change default s3_client attribute of platform_endpoint to use 'us-east-1' region
platform_endpoint.s3_client = boto3.client("s3", region_name="us-east-1")

platform_endpoint.always_through_s3 = True
# Can Set custom S3 Keys to be used to store objects in S3
platform_endpoint.publish(
    Message="This message should be published to S3 using the PlatformEndpoint resource",
    MessageAttributes={
        "S3Key": {
            "DataType": "String",
            "StringValue": "247c11c4-a22c-42e4-a6a2-9b5af5b76587",
        }
    },
)
print("\nPublished using PlatformEndpoint Resource:")
fetch_and_print_from_sqs(sqs, demo_queue_url,platform_endpoint)
```

**输出**

```
Published using SNS extended client:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3

Using decreased message size threshold:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the limit of the 32 bytes

Published using Topic Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the topic resource

Published using PlatformEndpoint Resource:
Published Message: ["software.amazon.payloadoffloading.PayloadS3Pointer", {"s3BucketName": "extended-client-bucket-store", "s3Key": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"}]
Message Stored in S3 Bucket is: This message should be published to S3 using the PlatformEndpoint resource
```