Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
此页面仅适用于使用 Vaults 和 2012 年原始 REST API 的 Amazon Glacier 服务的现有客户。
如果您正在寻找档案存储解决方案,我们建议您在亚马逊 S3、S3 Glacier 即时检索、S3 Glacier 灵活检索和 S3 Glacier Deep Archive Deep Archive 中使用 Amazon Glacier 存储类。要了解有关这些存储选项的更多信息,请参阅 Amazon Glacier 存储类别。
从 2025 年 12 月 15 日起,Amazon Glacier(最初基于保管库的独立服务)将不再接受新客户,对现有客户不产生任何影响。Amazon Glacier 是一项独立的服务 APIs ,拥有自己的服务,可将数据存储在文件库中,不同于亚马逊 S3 和 Amazon S3 Glacier 存储类别。在 Amazon Glacier 中,您的现有数据将保持安全且可以无限期地访问。无需迁移。对于低成本、长期的存档存储, Amazon 建议使用 Amazon S3 Glacier 存储类别,这些存储类别基于S3存储桶 APIs、完全 Amazon Web Services 区域 可用性、更低的成本和 Amazon 服务集成,可提供卓越的客户体验。如果您想要增强功能,可以考虑使用我们的Amazon 解决方案指南迁移到 Amazon S3 Glacier 存储类别,将数据从 Amazon Glacier 文件库传输到 Amazon S3 Glacier 存储类。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
获取 Amazon Glacier 档案内容并使用 Amazon 软件开发工具包删除档案
以下代码示例展示了如何:
- Python
-
- 适用于 Python 的 SDK (Boto3)
-
创建一个封装 Amazon Glacier 操作的类。
import argparse
import logging
import os
import boto3
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
class GlacierWrapper:
"""Encapsulates Amazon S3 Glacier API operations."""
def __init__(self, glacier_resource):
"""
:param glacier_resource: A Boto3 Amazon S3 Glacier resource.
"""
self.glacier_resource = glacier_resource
@staticmethod
def list_jobs(vault, job_type):
"""
Lists jobs by type for the specified vault.
:param vault: The vault to query.
:param job_type: The type of job to list.
:return: The list of jobs of the requested type.
"""
job_list = []
try:
if job_type == "all":
jobs = vault.jobs.all()
elif job_type == "in_progress":
jobs = vault.jobs_in_progress.all()
elif job_type == "completed":
jobs = vault.completed_jobs.all()
elif job_type == "succeeded":
jobs = vault.succeeded_jobs.all()
elif job_type == "failed":
jobs = vault.failed_jobs.all()
else:
jobs = []
logger.warning("%s isn't a type of job I can get.", job_type)
for job in jobs:
job_list.append(job)
logger.info("Got %s %s job %s.", job_type, job.action, job.id)
except ClientError:
logger.exception("Couldn't get %s jobs from %s.", job_type, vault.name)
raise
else:
return job_list
@staticmethod
def get_job_output(job):
"""
Gets the output of a job, such as a vault inventory or the contents of an
archive.
:param job: The job to get output from.
:return: The job output, in bytes.
"""
try:
response = job.get_output()
out_bytes = response["body"].read()
logger.info("Read %s bytes from job %s.", len(out_bytes), job.id)
if "archiveDescription" in response:
logger.info(
"These bytes are described as '%s'", response["archiveDescription"]
)
except ClientError:
logger.exception("Couldn't get output for job %s.", job.id)
raise
else:
return out_bytes
@staticmethod
def delete_archive(archive):
"""
Deletes an archive from a vault.
:param archive: The archive to delete.
"""
try:
archive.delete()
logger.info(
"Deleted archive %s from vault %s.", archive.id, archive.vault_name
)
except ClientError:
logger.exception("Couldn't delete archive %s.", archive.id)
raise
@staticmethod
def delete_vault(vault):
"""
Deletes a vault.
:param vault: The vault to delete.
"""
try:
vault.delete()
logger.info("Deleted vault %s.", vault.name)
except ClientError:
logger.exception("Couldn't delete vault %s.", vault.name)
raise
调用包装器类上的函数,以从已完成的任务中获取档案内容,然后删除档案。
def retrieve_demo(glacier, vault_name):
"""
Shows how to:
* List jobs for a vault and get job status.
* Get the output of a completed archive retrieval job.
* Delete an archive.
* Delete a vault.
:param glacier: A Boto3 Amazon S3 Glacier resource.
:param vault_name: The name of the vault to query for jobs.
"""
vault = glacier.glacier_resource.Vault("-", vault_name)
try:
vault.load()
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
print(
f"\nVault {vault_name} doesn't exist. You must first run this script "
f"with the --upload flag to create the vault."
)
return
else:
raise
print(f"\nGetting completed jobs for {vault.name}.")
jobs = glacier.list_jobs(vault, "completed")
if not jobs:
print("\nNo completed jobs found. Give it some time and try again later.")
return
retrieval_job = None
for job in jobs:
if job.action == "ArchiveRetrieval" and job.status_code == "Succeeded":
retrieval_job = job
break
if retrieval_job is None:
print(
"\nNo ArchiveRetrieval jobs found. Give it some time and try again "
"later."
)
return
print(f"\nGetting output from job {retrieval_job.id}.")
archive_bytes = glacier.get_job_output(retrieval_job)
archive_str = archive_bytes.decode("utf-8")
print("\nGot archive data. Printing the first 10 lines.")
print(os.linesep.join(archive_str.split(os.linesep)[:10]))
print(f"\nDeleting the archive from {vault.name}.")
archive = glacier.glacier_resource.Archive(
"-", vault.name, retrieval_job.archive_id
)
glacier.delete_archive(archive)
print(f"\nDeleting {vault.name}.")
glacier.delete_vault(vault)
有关 S Amazon DK 开发者指南和代码示例的完整列表,请参阅使用带有 Amazon SDK 的 Amazon Glacier。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。