使用 Python 和 Amazon S3 管理大型 Amazon SQS 消息 - Amazon Simple Queue Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Python 和 Amazon S3 管理大型 Amazon SQS 消息

要管理大型 Amazon SQS 消息,您可以使用适用于 Python 的 Amazon Simple Queue Service 扩展型客户端库和 Amazon Simple Storage Service。这对处理 256KB 至 2GB 的大型消息有效载荷特别有用。该库将消息有效载荷保存到 Amazon S3 存储桶中,并将包含对已存储 Amazon S3 对象的引用的消息发送到 Amazon SQS 队列。

您可以使用适用于 Python 的扩展型客户端库执行以下操作:

  • 指定有效载荷是始终存储在 Amazon S3 中,还是仅当有效载荷大小超过 256KB 时才存储在 Amazon S3 中

  • 发送引用存储在 Amazon S3 存储桶中的单个消息对象的消息

  • 从 Amazon S3 存储桶检索相应的有效载荷对象

  • 从 Amazon S3 存储桶删除相应的有效载荷对象

先决条件

以下是使用适用于 Python 的 Amazon SQS 扩展型客户端库的先决条件:

  • 具有必要凭证的 Amazon 账户。要创建 Amazon 账户,请导航到 Amazon 主页,然后选择创建 Amazon 账户。按照说明进行操作。有关凭证的信息,请参阅 Credentials

  • Amazon SDK:本页面上的示例使用的是适用于 Python 的 Amazon SDK(Boto3)。要安装和设置 SDK,请参阅《适用于 Python 的 Amazon SDK 开发人员指南》中的适用于 Python 的 Amazon SDK 文档

  • Python 3.x(或更高版本)和 pip

  • 适用于 Python 的 Amazon SQS 扩展型客户端库(可以从 PyPI 中获得)

注意

您可以使用适用于 Python 的 Amazon SQS 扩展型客户端库,通过 Amazon S3 管理 Amazon SQS 消息,而这一功能只能通过适用于 Python 的 Amazon SDK 实现。您无法使用 Amazon CLI、Amazon SQS 控制台、Amazon SQS HTTP API 或任何其他 Amazon SDK 做到这一点。

配置消息存储

Amazon SQS 扩展型客户端使用以下消息属性来配置 Amazon S3 消息存储选项:

  • large_payload_support:用于存储大型消息的 Amazon S3 存储桶名称。

  • always_through_s3:如果设置为 True,则所有消息都会存储在 Amazon S3 中。如果设置为 False,则小于 256KB 的消息不会被序列化并存储到 S3 存储桶中。默认为 False

  • use_legacy_attribute:如果设置为 True,则所有已发布的消息都使用旧版保留消息属性(SQSLargePayloadSize),而不是当前的保留消息属性(ExtendedPayloadSize)。

使用适用于 Python 的扩展型客户端库管理大型 Amazon SQS 消息

以下示例创建了具有随机名称的 Amazon S3 存储桶。然后,它会创建一个名为 MyQueue 的 Amazon SQS 队列并向该队列发送一条存储在 S3 存储桶中且大小超过 256KB 的随机消息。最后,代码会检索该消息,返回该消息的相关信息,并删除该消息、队列和存储桶。

import boto3 import sqs_extended_client #Set the Amazon SQS extended client configuration with large payload. sqs_extended_client = boto3.client("sqs", region_name="us-east-1") sqs_extended_client.large_payload_support = "S3_BUCKET_NAME" sqs_extended_client.use_legacy_attribute = False # Create an SQS message queue for this example. Then, extract the queue URL. queue = sqs_extended_client.create_queue( QueueName = "MyQueue" ) queue_url = sqs_extended_client.get_queue_url( QueueName = "MyQueue" )['QueueUrl'] # Create the S3 bucket and allow message objects to be stored in the bucket. sqs_extended_client.s3_client.create_bucket(Bucket=sqs_extended_client.large_payload_support) # Sending a large message small_message = "s" large_message = small_message * 300000 # Shall cross the limit of 256 KB send_message_response = sqs_extended_client.send_message( QueueUrl=queue_url, MessageBody=large_message ) assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Receiving the large message receive_message_response = sqs_extended_client.receive_message( QueueUrl=queue_url, MessageAttributeNames=['All'] ) assert receive_message_response['Messages'][0]['Body'] == large_message receipt_handle = receive_message_response['Messages'][0]['ReceiptHandle'] # Deleting the large message # Set to True for deleting the payload from S3 sqs_extended_client.delete_payload_from_s3 = True delete_message_response = sqs_extended_client.delete_message( QueueUrl=queue_url, ReceiptHandle=receipt_handle ) assert delete_message_response['ResponseMetadata']['HTTPStatusCode'] == 200 # Deleting the queue delete_queue_response = sqs_extended_client.delete_queue( QueueUrl=queue_url ) assert delete_queue_response['ResponseMetadata']['HTTPStatusCode'] == 200