本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Python 和 Amazon S3 管理大型亚马逊 SQS 消息
您可以使用适用于 Python 的亚马逊简单队列服务扩展客户端库
您可以使用适用于 Python 的扩展客户端库来执行以下操作:
-
指定有效负载是始终存储在 Amazon S3 中,还是仅在有效负载大小超过 256 KB 时存储在 S3 中
-
发送一条引用 Amazon S3 存储桶中存储的单个消息对象的消息
-
从 Amazon S3 存储桶中检索相应的负载对象
-
从 Amazon S3 存储桶中删除相应的负载对象
先决条件
以下是使用适用于 Python 的 Amazon SQS 扩展客户端库的先决条件:
-
具有必要凭证的 Amazon 账户。要创建 Amazon 帐户,请导航到Amazon 主页
,然后选择创建 Amazon 帐户。按照说明进行操作。有关证书的信息,请参阅凭证 。 -
Amazon 软件开发工具包:本页上的示例使用 Amazon Python SDK Boto3。要安装和设置软件开发工具包,请参阅适用于 Python 的开发Amazon 工具包开发者指南中的适用于
Python 的开发Amazon 工具包文档 -
Python 3.x(或更高版本)和。
pip
注意
只有使用 Amazon 适用于 Python 的软件开发工具包,您才能使用亚马逊 SQS Python 扩展客户端库通过亚马逊 S3 管理亚马逊 SQS 消息。 Amazon 使用 CLI、亚马逊 SQS 控制台、亚马逊 SQS HTTP API 或任何其他软件开发工具包都无法执行此操作。 Amazon
配置消息存储
Amazon SQS 扩展客户端使用以下消息属性来配置 Amazon S3 消息存储选项:
-
large_payload_support
: 用于存储大型消息的 Amazon S3 存储桶名称。 -
always_through_s3
: 如果True
,则所有消息都存储在 Amazon S3 中。如果是False
,则小于 256 KB 的消息将不会序列化到 s3 存储桶。默认值为False
。 -
use_legacy_attribute
:如果是True
,则所有已发布的消息都使用旧版保留消息属性 (SQSLargePayloadSize
),而不是当前的保留消息属性 (ExtendedPayloadSize
)。
使用适用于 Python 的扩展客户端库管理大型 Amazon SQS 消息
以下示例使用随机名称创建一个 Amazon S3 存储桶。然后,它会创建一个名为的 Amazon SQS 队列,MyQueue
并向该队列发送一条存储在 S3 存储桶中且超过 256 KB 的消息。最后,代码会检索该消息,返回该消息的相关信息,并删除该消息、队列和存储桶。
import boto3 import sqs_extended_client #Set the Amazon SQS extended client configuration with large payload. sqs_extended_client = boto3.client("sqs", region_name="us-east-1") sqs_extended_client.large_payload_support = "S3_BUCKET_NAME" sqs_extended_client.use_legacy_attribute = False # Create an SQS message queue for this example. Then, extract the queue URL. queue = sqs_extended_client.create_queue( QueueName = "MyQueue" ) queue_url = sqs_extended_client.get_queue_url( QueueName = "MyQueue" )['QueueUrl'] # Create the S3 bucket and allow message objects to be stored in the bucket. sqs_extended_client.s3_client.create_bucket(Bucket=sqs_extended_client.large_payload_support) # Sending a large message small_message = "s" large_message = small_message * 300000 # Shall cross the limit of 256 KB send_message_response = sqs_extended_client.send_message( QueueUrl=queue_url, MessageBody=large_message ) assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Receiving the large message receive_message_response = sqs_extended_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=['All'] ) assert receive_message_response['Messages'][0]['Body'] == large_message receipt_handle = receive_message_response['Messages'][0]['ReceiptHandle'] # Deleting the large message # Set to True for deleting the payload from S3 sqs_extended_client.delete_payload_from_s3 = True delete_message_response = sqs_extended_client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) assert delete_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Deleting the queue delete_queue_response = sqs_extended_client.delete_queue( QueueUrl=queue_url ) assert delete_queue_response['ResponseMetadata']['HTTPStatusCode'] == 200