适用于 Python 的扩展型客户端库 - Amazon Simple Notification Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

适用于 Python 的扩展型客户端库

先决条件

以下是使用适用于 Python 的 Amazon SNS 扩展型客户端库的先决条件:

  • 一个 Amazon 软件开发工具包。

    本页上的示例使用 Amazon Python SDK Boto3。要安装和设置 SDK,请参阅 Amazon SDK for Python 文档。

  • 并 Amazon Web Services 账户 具有正确的凭据。

    要创建 Amazon Web Services 账户,请导航到Amazon 主页,然后选择创建 Amazon 帐户。按照说明进行操作。

    有关凭证的信息,请参阅《Amazon SDK for Python 开发人员指南》中的凭证

  • Python 3.x(或更高版本)和 pip。

  • 适用于 Python 的 Amazon SNS 扩展型客户端库(也可从 PyPI 中获得)。

配置消息存储

Boto3 Amazon SN S 客户端、主题PlatformEndpoint和对象上提供了以下属性,用于配置 Amazon S3 消息存储选项。

  • large_payload_support – 用于存储大型消息的 Amazon S3 桶名称。

  • message_size_threshold – 在大型消息桶中存储消息的阈值。不能低于 0 或超过 262144。原定设置值为 262144。

  • always_through_s3 – 如果为 True,则所有消息都存储在 Amazon S3 中。默认值为 False

  • s3 – 用于在 Amazon S3 中存储对象的 Boto3 Amazon S3 resource 对象。如果您想要控制 Amazon S3 资源(例如,自定义 Amazon S3 配置或凭证),请使用此选项。如果之前在首次使用时未设置,则原定设置值为 boto3.resource("s3")

示例:使用存储在 Amazon S3 中的有效负载将消息发布到 Amazon SNS

以下代码示例展示了如何:

  • 创建示例 Amazon SNS 主题和 Amazon SQS 队列。

  • 订阅队列以接收来自主题的消息。

  • 发布测试消息。

  • 消息有效负载存储在 Amazon S3 中,并发布对它的引用。

  • 打印队列中已发布的消息以及从 Amazon S3 检索到的原始消息。

要发布大型消息,请使用适用于 Python 的 Amazon SNS 扩展型客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

import boto3 import sns_extended_client from json import loads s3_extended_payload_bucket = "extended-client-bucket-store" TOPIC_NAME = "---TOPIC-NAME---" QUEUE_NAME = "---QUEUE-NAME---" # Create an helper to fetch message from S3 def get_msg_from_s3(body): json_msg = loads(body) s3_client = boto3.client("s3") s3_object = s3_client.get_object( Bucket=json_msg[1].get("s3BucketName"), Key=json_msg[1].get("s3Key") ) msg = s3_object.get("Body").read().decode() return msg # Create an helper to fetch and print message SQS queue and S3 def fetch_and_print_from_sqs(sqs, queue_url): """Handy Helper to fetch and print message from SQS queue and S3""" message = sqs.receive_message( QueueUrl=queue_url, MessageAttributeNames=["All"], MaxNumberOfMessages=1 ).get("Messages")[0] message_body = message.get("Body") print("Published Message: {}".format(message_body)) print("Message Stored in S3 Bucket is: {}\n".format(get_msg_from_s3(message_body))) # Initialize the SNS client and create SNS Topic sns_extended_client = boto3.client("sns", region_name="us-east-1") create_topic_response = sns_extended_client.create_topic(Name=TOPIC_NAME) demo_topic_arn = create_topic_response.get("TopicArn") # Create and subscribe an SQS queue to the SNS client sqs = boto3.client("sqs") demo_queue_url = sqs.create_queue(QueueName=QUEUE_NAME).get("QueueUrl") demo_queue_arn = sqs.get_queue_attributes(QueueUrl=demo_queue_url, AttributeNames=["QueueArn"])["Attributes"].get("QueueArn") # Set the RawMessageDelivery subscription attribute to TRUE sns_extended_client.subscribe(TopicArn=demo_topic_arn, Protocol="sqs", Endpoint=demo_queue_arn, Attributes={"RawMessageDelivery":"true"}) sns_extended_client.large_payload_support = s3_extended_payload_bucket # To store all messages content in S3, set always_through_s3 to True # In the example, we set message size threshold as 32 bytes, adjust this threshold as per your usecase # Message will only be uploaded to S3 when its payload size exceeded threshold sns_extended_client.message_size_threshold = 32 sns_extended_client.publish( TopicArn=demo_topic_arn, Message="This message should be published to S3 as it exceeds the message_size_threshold limit", ) # Print message stored in s3 fetch_and_print_from_sqs(sqs, demo_queue_url)

输出

Published Message: [ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket-store", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ] Message Stored in S3 Bucket is: This message should be published to S3 as it exceeds the message_size_threshold limit