使用 Python 和 Amazon S3 管理大型亚马逊 SQS 消息 - Amazon Simple Queue Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Python 和 Amazon S3 管理大型亚马逊 SQS 消息

您可以使用适用于 Python 的亚马逊简单队列服务扩展客户端库和亚马逊简单存储服务来管理大型 Amazon SQS 消息。这对于消耗从 256 KB 到 2 GB 的大型消息负载特别有用。该库将消息负载保存到 Amazon S3 存储桶,并将包含存储的 Amazon S3 对象引用的消息发送到亚马逊 SQS 队列。

您可以使用适用于 Python 的扩展客户端库来执行以下操作:

  • 指定有效负载是始终存储在 Amazon S3 中,还是仅在有效负载大小超过 256 KB 时存储在 S3 中

  • 发送一条引用 Amazon S3 存储桶中存储的单个消息对象的消息

  • 从 Amazon S3 存储桶中检索相应的负载对象

  • 从 Amazon S3 存储桶中删除相应的负载对象

先决条件

以下是使用适用于 Python 的 Amazon SQS 扩展客户端库的先决条件:

注意

只有使用 Amazon 适用于 Python 的软件开发工具包,您才能使用亚马逊 SQS Python 扩展客户端库通过亚马逊 S3 管理亚马逊 SQS 消息。 Amazon 使用 CLI、亚马逊 SQS 控制台、亚马逊 SQS HTTP API 或任何其他软件开发工具包都无法执行此操作。 Amazon

配置消息存储

Amazon SQS 扩展客户端使用以下消息属性来配置 Amazon S3 消息存储选项:

  • large_payload_support: 用于存储大型消息的 Amazon S3 存储桶名称。

  • always_through_s3: 如果True,则所有消息都存储在 Amazon S3 中。如果是False,则小于 256 KB 的消息将不会序列化到 s3 存储桶。默认值为 False

  • use_legacy_attribute:如果是True,则所有已发布的消息都使用旧版保留消息属性 (SQSLargePayloadSize),而不是当前的保留消息属性 (ExtendedPayloadSize)。

使用适用于 Python 的扩展客户端库管理大型 Amazon SQS 消息

以下示例使用随机名称创建一个 Amazon S3 存储桶。然后,它会创建一个名为的 Amazon SQS 队列,MyQueue并向该队列发送一条存储在 S3 存储桶中且超过 256 KB 的消息。最后,代码会检索该消息,返回该消息的相关信息,并删除该消息、队列和存储桶。

import boto3 import sqs_extended_client #Set the Amazon SQS extended client configuration with large payload. sqs_extended_client = boto3.client("sqs", region_name="us-east-1") sqs_extended_client.large_payload_support = "S3_BUCKET_NAME" sqs_extended_client.use_legacy_attribute = False # Create an SQS message queue for this example. Then, extract the queue URL. queue = sqs_extended_client.create_queue( QueueName = "MyQueue" ) queue_url = sqs_extended_client.get_queue_url( QueueName = "MyQueue" )['QueueUrl'] # Create the S3 bucket and allow message objects to be stored in the bucket. sqs_extended_client.s3_client.create_bucket(Bucket=sqs_extended_client.large_payload_support) # Sending a large message small_message = "s" large_message = small_message * 300000 # Shall cross the limit of 256 KB send_message_response = sqs_extended_client.send_message( QueueUrl=queue_url, MessageBody=large_message ) assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Receiving the large message receive_message_response = sqs_extended_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=['All'] ) assert receive_message_response['Messages'][0]['Body'] == large_message receipt_handle = receive_message_response['Messages'][0]['ReceiptHandle'] # Deleting the large message # Set to True for deleting the payload from S3 sqs_extended_client.delete_payload_from_s3 = True delete_message_response = sqs_extended_client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) assert delete_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Deleting the queue delete_queue_response = sqs_extended_client.delete_queue( QueueUrl=queue_url ) assert delete_queue_response['ResponseMetadata']['HTTPStatusCode'] == 200