本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Python 和 Amazon S3 管理大型 Amazon SQS 消息
要管理大型 Amazon SQS 消息,您可以使用适用于 Python 的 Amazon Simple Queue Service 扩展型客户端库
您可以使用适用于 Python 的扩展型客户端库执行以下操作:
-
指定有效载荷是始终存储在 Amazon S3 中,还是仅当有效载荷大小超过 256KB 时才存储在 Amazon S3 中
-
发送引用存储在 Amazon S3 存储桶中的单个消息对象的消息
-
从 Amazon S3 存储桶检索相应的有效载荷对象
-
从 Amazon S3 存储桶删除相应的有效载荷对象
先决条件
以下是使用适用于 Python 的 Amazon SQS 扩展型客户端库的先决条件:
-
具有必要凭证的 Amazon 账户。要创建 Amazon 账户,请导航到 Amazon 主页
,然后选择创建 Amazon 账户。按照说明进行操作。有关凭证的信息,请参阅 Credentials 。 -
Amazon SDK:本页面上的示例使用的是适用于 Python 的 Amazon SDK(Boto3)。要安装和设置 SDK,请参阅《适用于 Python 的 Amazon SDK 开发人员指南》中的适用于 Python 的 Amazon SDK
文档 -
Python 3.x(或更高版本)和
pip
。 -
适用于 Python 的 Amazon SQS 扩展型客户端库(可以从 PyPI
中获得)
注意
您可以使用适用于 Python 的 Amazon SQS 扩展型客户端库,通过 Amazon S3 管理 Amazon SQS 消息,而这一功能只能通过适用于 Python 的 Amazon SDK 实现。您无法使用 Amazon CLI、Amazon SQS 控制台、Amazon SQS HTTP API 或任何其他 Amazon SDK 做到这一点。
配置消息存储
Amazon SQS 扩展型客户端使用以下消息属性来配置 Amazon S3 消息存储选项:
-
large_payload_support
:用于存储大型消息的 Amazon S3 存储桶名称。 -
always_through_s3
:如果设置为True
,则所有消息都会存储在 Amazon S3 中。如果设置为False
,则小于 256KB 的消息不会被序列化并存储到 S3 存储桶中。默认为False
。 -
use_legacy_attribute
:如果设置为True
,则所有已发布的消息都使用旧版保留消息属性(SQSLargePayloadSize
),而不是当前的保留消息属性(ExtendedPayloadSize
)。
使用适用于 Python 的扩展型客户端库管理大型 Amazon SQS 消息
以下示例创建了具有随机名称的 Amazon S3 存储桶。然后,它会创建一个名为 MyQueue
的 Amazon SQS 队列并向该队列发送一条存储在 S3 存储桶中且大小超过 256KB 的随机消息。最后,代码会检索该消息,返回该消息的相关信息,并删除该消息、队列和存储桶。
import boto3 import sqs_extended_client #Set the Amazon SQS extended client configuration with large payload. sqs_extended_client = boto3.client("sqs", region_name="us-east-1") sqs_extended_client.large_payload_support = "S3_BUCKET_NAME" sqs_extended_client.use_legacy_attribute = False # Create an SQS message queue for this example. Then, extract the queue URL. queue = sqs_extended_client.create_queue( QueueName = "MyQueue" ) queue_url = sqs_extended_client.get_queue_url( QueueName = "MyQueue" )['QueueUrl'] # Create the S3 bucket and allow message objects to be stored in the bucket. sqs_extended_client.s3_client.create_bucket(Bucket=sqs_extended_client.large_payload_support) # Sending a large message small_message = "s" large_message = small_message * 300000 # Shall cross the limit of 256 KB send_message_response = sqs_extended_client.send_message( QueueUrl=queue_url, MessageBody=large_message ) assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Receiving the large message receive_message_response = sqs_extended_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=['All'] ) assert receive_message_response['Messages'][0]['Body'] == large_message receipt_handle = receive_message_response['Messages'][0]['ReceiptHandle'] # Deleting the large message # Set to True for deleting the payload from S3 sqs_extended_client.delete_payload_from_s3 = True delete_message_response = sqs_extended_client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) assert delete_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Deleting the queue delete_queue_response = sqs_extended_client.delete_queue( QueueUrl=queue_url ) assert delete_queue_response['ResponseMetadata']['HTTPStatusCode'] == 200