获取 Amazon S3 Glacier 档案内容并使用 Amazon 软件开发工具包删除档案 - Amazon S3 Glacier
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

如果您不熟悉 Amazon Simple Storage Service (Amazon S3) 中的归档存储功能,建议您先详细了解 Amazon S3 中的 S3 Glacier 存储类、S3 Glacier 即时检索S3 Glacier 灵活检索S3 Glacier 深度归档。有关更多信息,请参阅 Amazon S3 用户指南中的 S3 Glacier 存储类和用于存档对象的存储类。


获取 Amazon S3 Glacier 档案内容并使用 Amazon 软件开发工具包删除档案


  • 列出 Amazon S3 Glacier 文件库的任务并获取任务状态。

  • 获取已完成的档案检索任务的输出。

  • 删除档案。

  • 删除文件库。

SDK for Python (Boto3)

还有更多相关信息 GitHub。在 Amazon 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建一个包装 S3 Glacier 操作的类。

import argparse import logging import os import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class GlacierWrapper: """Encapsulates Amazon S3 Glacier API operations.""" def __init__(self, glacier_resource): """ :param glacier_resource: A Boto3 Amazon S3 Glacier resource. """ self.glacier_resource = glacier_resource @staticmethod def list_jobs(vault, job_type): """ Lists jobs by type for the specified vault. :param vault: The vault to query. :param job_type: The type of job to list. :return: The list of jobs of the requested type. """ job_list = [] try: if job_type == "all": jobs = vault.jobs.all() elif job_type == "in_progress": jobs = vault.jobs_in_progress.all() elif job_type == "completed": jobs = vault.completed_jobs.all() elif job_type == "succeeded": jobs = vault.succeeded_jobs.all() elif job_type == "failed": jobs = vault.failed_jobs.all() else: jobs = [] logger.warning("%s isn't a type of job I can get.", job_type) for job in jobs: job_list.append(job) logger.info("Got %s %s job %s.", job_type, job.action, job.id) except ClientError: logger.exception("Couldn't get %s jobs from %s.", job_type, vault.name) raise else: return job_list @staticmethod def get_job_output(job): """ Gets the output of a job, such as a vault inventory or the contents of an archive. :param job: The job to get output from. :return: The job output, in bytes. """ try: response = job.get_output() out_bytes = response["body"].read() logger.info("Read %s bytes from job %s.", len(out_bytes), job.id) if "archiveDescription" in response: logger.info( "These bytes are described as '%s'", response["archiveDescription"] ) except ClientError: logger.exception("Couldn't get output for job %s.", job.id) raise else: return out_bytes @staticmethod def delete_archive(archive): """ Deletes an archive from a vault. :param archive: The archive to delete. """ try: archive.delete() logger.info( "Deleted archive %s from vault %s.", archive.id, archive.vault_name ) except ClientError: logger.exception("Couldn't delete archive %s.", archive.id) raise @staticmethod def delete_vault(vault): """ Deletes a vault. :param vault: The vault to delete. """ try: vault.delete() logger.info("Deleted vault %s.", vault.name) except ClientError: logger.exception("Couldn't delete vault %s.", vault.name) raise


def retrieve_demo(glacier, vault_name): """ Shows how to: * List jobs for a vault and get job status. * Get the output of a completed archive retrieval job. * Delete an archive. * Delete a vault. :param glacier: A Boto3 Amazon S3 Glacier resource. :param vault_name: The name of the vault to query for jobs. """ vault = glacier.glacier_resource.Vault("-", vault_name) try: vault.load() except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": print( f"\nVault {vault_name} doesn't exist. You must first run this script " f"with the --upload flag to create the vault." ) return else: raise print(f"\nGetting completed jobs for {vault.name}.") jobs = glacier.list_jobs(vault, "completed") if not jobs: print("\nNo completed jobs found. Give it some time and try again later.") return retrieval_job = None for job in jobs: if job.action == "ArchiveRetrieval" and job.status_code == "Succeeded": retrieval_job = job break if retrieval_job is None: print( "\nNo ArchiveRetrieval jobs found. Give it some time and try again " "later." ) return print(f"\nGetting output from job {retrieval_job.id}.") archive_bytes = glacier.get_job_output(retrieval_job) archive_str = archive_bytes.decode("utf-8") print("\nGot archive data. Printing the first 10 lines.") print(os.linesep.join(archive_str.split(os.linesep)[:10])) print(f"\nDeleting the archive from {vault.name}.") archive = glacier.glacier_resource.Archive( "-", vault.name, retrieval_job.archive_id ) glacier.delete_archive(archive) print(f"\nDeleting {vault.name}.") glacier.delete_vault(vault)

有关 S Amazon DK 开发者指南和代码示例的完整列表,请参阅将 S3 Glacier 与 Amazon SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。