

 **此页面仅适用于使用文件库和 2012 年原始 REST API 的 Amazon Glacier 服务的现有客户。**

如果您正在寻找归档存储解决方案，建议使用 Amazon S3 中的 Amazon Glacier 存储类别 S3 Glacier Instant Retrieval、S3 Glacier Flexible Retrieval 和 S3 Glacier Deep Archive。要了解有关这些存储选项的更多信息，请参阅 [Amazon Glacier 存储类别](https://www.amazonaws.cn/s3/storage-classes/glacier/)。

Amazon Glacier（最初基于保管库的独立服务）不再接受新客户。Amazon Glacier 是一项独立的服务 APIs ，拥有自己的服务，可将数据存储在文件库中，不同于亚马逊 S3 和 Amazon S3 Glacier 存储类别。在 Amazon Glacier 中，您现有的数据将确保安全，并且可以无限期地访问。无需进行迁移。对于低成本、长期的存档存储， Amazon 建议[使用 Amazon S3 Glacier 存储类别，这些存储类别](https://www.amazonaws.cn/s3/storage-classes/glacier/)基于S3存储桶 APIs、完全 Amazon Web Services 区域 可用性、更低的成本和 Amazon 服务集成，可提供卓越的客户体验。如果您希望加强功能，可以考虑使用我们的 [Amazon 将数据从 Amazon Glacier 文件库传输到 Amazon S3 Glacier 存储类别的解决方案指南](https://www.amazonaws.cn/solutions/guidance/data-transfer-from-amazon-s3-glacier-vaults-to-amazon-s3/)，迁移到 Amazon S3 Glacier 存储类别。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Python 并行处理下载大型档案
使用 Python 下载大型档案

本主题介绍如何使用 Python 并行处理从 Amazon S3 Glacier（S3 Glacier）下载大型档案。通过此方法，您可以将任意大小的档案分解为可独立处理的较小片段，从而可靠地下载这些档案。

## 概述


本示例中提供的 Python 脚本将执行以下任务：

1. 为通知设置必要的 Amazon 资源（亚马逊 SNS 主题和亚马逊 SQS 队列）

1. 使用 Amazon Glacier 启动档案检索任务

1. 监控 Amazon SQS 队列，获取任务完成通知

1. 将大型档案拆分为可管理的分块

1. 使用多个工作线程并行下载分块

1. 将每个分块保存到磁盘，以备日后重新组装

## 先决条件


开始之前，请确保您已具备以下条件：
+ 已安装 Python 3.6 或更高版本。
+ Amazon 已安装适用于 Python 的 SDK (Boto3)
+ Amazon 为亚马逊 Glacier、Amazon SNS 和亚马逊 SQS 配置了相应权限的证书
+ 有足够的磁盘空间存储下载的档案分块

## 示例：使用 Python 并行处理下载档案


以下 Python 脚本演示了如何使用并行处理从 Amazon Glacier 下载大型档案：

```
import boto3
import time
import json
import jmespath
import re
import concurrent.futures
import os

output_file_path = "output_directory_path"
vault_name = "vault_name"

chunk_size = 1000000000 #1gb - size of chunks for parallel download.
notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification
chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks
sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored.
chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted.
region = 'us-east-1'
archive_id = "archive_id_to_restore"
retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue.
workers = 12 # the number of parallel worker threads for downloading chunks. 

def setup_queues_and_topic():
    sqs = boto3.client('sqs')
    sns = boto3.client('sns')

    # Create the SNS topic
    topic_response = sns.create_topic(
        Name=sns_topic_name
    )
    topic_arn = topic_response['TopicArn']
    print("Creating the SNS topic " + topic_arn)

    # Create the notification queue
    notify_queue_response = sqs.create_queue(
        QueueName=notify_queue_name,
        Attributes={
            'VisibilityTimeout': '300',  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '20'  # Enable long polling
        }
    )
    notify_queue_url = notify_queue_response['QueueUrl']
    print("Creating the archive-retrieval notification queue " + notify_queue_url)

    # Create the chunk download queue
    chunk_queue_response = sqs.create_queue(
        QueueName=chunk_download_queue_name,
        Attributes={
            'VisibilityTimeout': str(chunk_queue_visibility_timeout),  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '0'
        }
    )
    chunk_queue_url = chunk_queue_response['QueueUrl']

    print("Creating the chunk ready notification queue " + chunk_queue_url)


   # Get the ARN for the notification queue
    notify_queue_attributes = sqs.get_queue_attributes(
        QueueUrl=notify_queue_url,
        AttributeNames=['QueueArn']
    )
    notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn']

    # Set up the SNS topic policy on the notification queue
    queue_policy = {
        "Version": "2012-10-17",		 	 	 
        "Statement": [{
            "Sid": "allow-sns-messages",
            "Effect": "Allow",
            "Principal": {"AWS": "*"},
            "Action": "SQS:SendMessage",
            "Resource": notify_queue_arn,
            "Condition": {
                "ArnEquals": {
                    "aws:SourceArn": topic_arn
                }
            }
        }]
    }

    # Set the queue policy
    sqs.set_queue_attributes(
        QueueUrl=notify_queue_url,
        Attributes={
            'Policy': json.dumps(queue_policy)
        }
    )

    # Subscribe the notification queue to the SNS topic
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=notify_queue_arn
    )

    return {
        'topic_arn': topic_arn,
        'notify_queue_url': notify_queue_url,
        'chunk_queue_url': chunk_queue_url
    }


def split_and_send_chunks(archive_size, job_id,chunk_queue_url):
    ranges = []
    current = 0
    chunk_number = 0

    while current < archive_size:
        chunk_number += 1
        next_range = min(current + chunk_size - 1, archive_size - 1)
        ranges.append((current, next_range, chunk_number))
        current = next_range + 1

    # Send messages to SQS queue
    for start, end, chunk_number in ranges:
        body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number}
        body = json.dumps(body)
        print("Sending SQS message for range:" + str(body))
        response = sqs.send_message(
            QueueUrl=chunk_queue_url,
            MessageBody=str(body)
        )

def GetJobOutputChunks(job_id, byterange, chunk_number):
    glacier = boto3.client('glacier')
    response = glacier.get_job_output(
        vaultName=vault_name,
        jobId=job_id,
        range=byterange,

    )

    with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file:
        output_file.write(response['body'].read())

    return response

def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url):

    response = sqs.receive_message(
        QueueUrl=notify_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,
        MessageAttributeNames=['Message']
    )
    print("Polling archive retrieval job ready queue...")
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty

    if 'Messages' in response:
        print("Received a message from the archive retrieval job queue")
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        jsonresponse=json.loads(jsonresponse['Message'])
        if 'ArchiveSizeInBytes' in jsonresponse:
            receipt_handle = response['Messages'][0]['ReceiptHandle']    
            if jsonresponse['ArchiveSizeInBytes']:
                archive_size = jsonresponse['ArchiveSizeInBytes']

                print(f'Received message: {response}')      
                if archive_size > chunk_size:
                    split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url)

                    sqs.delete_message(
                    QueueUrl=notify_queue_url,
                    ReceiptHandle=receipt_handle)

            else:
                print("No ArchiveSizeInBytes value found in message")
                print(response)

    else:
        print('No messages available in the queue at this time.')

    time.sleep(1)

def ReceiveArchiveChunkMessages(chunk_queue_url):
    response = sqs.receive_message(
        QueueUrl=chunk_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=0,
        MessageAttributeNames=['Message']
    )
    print("Polling archive chunk queue...")
    print(response)
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty
    if 'Messages' in response:
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        if 'job_id' in jsonresponse: #checking that there is a job id before continuing
            job_id = jsonresponse['job_id']
            byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end'])
            chunk_number = jsonresponse['chunk_number']
            receipt_handle = response['Messages'][0]['ReceiptHandle']
            if jsonresponse['job_id']:
                print(f'Received message: {response}')
                GetJobOutputChunks(job_id,byterange,chunk_number)
                sqs.delete_message(
                QueueUrl=chunk_queue_url,
                ReceiptHandle=receipt_handle)
    else:
        print('No messages available in the chunk queue at this time.')

def initiate_archive_retrieval(archive_id, topic_arn):
    glacier = boto3.client('glacier')

    job_parameters = {
        "Type": "archive-retrieval",
        "ArchiveId": archive_id,
        "Description": "Archive retrieval job",
        "SNSTopic": topic_arn,
        "Tier": "Bulk"  # You can change this to "Standard" or "Expedited" based on your needs
    }

    try:
        response = glacier.initiate_job(
            vaultName=vault_name,
            jobParameters=job_parameters
        )

        print("Archive retrieval job initiated:")
        print(f"Job ID: {response['jobId']}")
        print(f"Job parameters: {job_parameters}")
        print(f"Complete response: {json.dumps(response, indent=2)}")

        return response['jobId']

    except Exception as e:
        print(f"Error initiating archive retrieval job: {str(e)}")
        raise

def run_async_tasks(chunk_queue_url, workers):
    max_workers = workers  # Set the desired maximum number of concurrent tasks
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for _ in range(max_workers):
            executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url)

# One time setup of the necessary queues and topics. 
queue_and_topic_atts = setup_queues_and_topic()

topic_arn = queue_and_topic_atts['topic_arn']
notify_queue_url = queue_and_topic_atts['notify_queue_url']
chunk_queue_url = queue_and_topic_atts['chunk_queue_url']

if retrieve_archive:
    print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn)
    job_id = initiate_archive_retrieval(archive_id, topic_arn)
else:
    print("Retrieve archive is false, polling queues and downloading only.")

while True:
   ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url)
   run_async_tasks(chunk_queue_url,workers)
```

## 使用脚本


要使用此脚本，请按照下列步骤操作：

1. 将脚本中的占位符值替换为您的具体信息：
   + *output\$1file\$1path*: 用于保存区块文件的目录
   + *vault\$1name*: 你的 S3 Glacier 保管库的名称
   + *notify\$1queue\$1name*: 作业通知队列的名称
   + *chunk\$1download\$1queue\$1name*: 区块下载队列的名称
   + *sns\$1topic\$1name*: SNS 主题的名称
   + *region*: Amazon 您的保管库所在的地区
   + *archive\$1id*: 要检索的档案的 ID

1. 运行脚本：

   ```
   python download_large_archive.py
   ```

1. 下载完所有分块后，您可以使用如下命令将它们合并为一个文件：

   ```
   cat /path/to/chunks/*.chunk > complete_archive.file
   ```

## 重要注意事项


使用此脚本时，请注意以下几点：
+ S3 Glacier 档案检索可能需要几个小时才能完成，具体取决于所选的检索层级。
+ 此脚本无限期运行，持续轮询队列。您可能需要根据自己的特定需求添加终止条件。
+ 确保您有足够的磁盘空间来存储所有档案分块。
+ 如果脚本中断，您可以通过 `retrieve_archive=False` 重新启动脚本，以继续下载分块，而无需启动新的检索任务。
+ 根据您的网络带宽*chunk\$1size*和系统资源调整和*workers*参数。
+ 亚马逊 S3 检索、亚马逊 SNS 和亚马逊 SQS 的使用收取标准 Amazon 费用。