事务数据操作 - Amazon Lake Formation
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

事务数据操作

Lake Formation ACID 事务提供快照隔离功能,这使多个作业能够通过添加和删除 Amazon S3 对象同时可靠地更新受管理的表,同时保持针对数据湖的每个查询的读取一致性。

ACID 事务提供的读取一致性的一个示例是用户在中启动查询时Amazon Athena它会扫描表格的所有分区以汇总销售数字。在扫描进行过程中以及在扫描完成之前,ETL 作业会向表中添加一个分区。如果用户在事务中执行查询,他们将看到反映查询开始时表状态的查询结果。结果不包括作业创建的新分区中的数据。

参与交易的 Lake Formation 操作,访问和修改Amazon Glue Data Catalog. 一个事务可以涉及多个受管辖的表。

事务还为涉及元数据的更改提供 ACID 属性,这些变更定义了组成受管理表的 Amazon S3 对象(表现)以及表架构。有关受管理表的更多信息,请参见Lake Formation 中受管理的表.

集成Amazon当涉及受管理的表时,像 Athena 这样的服务会自动在事务中执行查询。在你的Amazon GlueETL 作业,作业脚本在对数据湖执行任何读取或写入之前开始事务,引用事务中任何操作的事务 ID,并在完成时提交事务。如果读取或写入操作失败,作业脚本可以重试该操作,或者可以选择取消事务。您还可以通过指定要读取的过去时间点在查询中使用交易。

如果 Lake Formation 检测到某些失败,例如冲突写入,Lake Formation 可能会自动取消交易。取消事务会导致所有操作都被回退,如中所述在交易中读取数据湖并向数据湖写入.

为了使 Lake Formation 能够区分长时间运行的事务(例如,运行多个小时的 Spark ETL 作业)和因崩溃而放弃的事务,长时间运行的写入事务应调用心跳 API 操作ExtendTransaction定期。这对于只读交易不是必需的。Lake Formation 会自动取消闲置太长时间的交易。

注意

必须在交易的上下文中对受管理的表进行修改。如果 ETL 任务在受管理的表上执行操作,但没有明确提供交易 ID,那么 Lake Formation 会在操作结束时自动启动交易并提交(或取消)该交易。这被称为单一对账单交易。

对于具有写入操作的事务,请调用CommitTransaction将将事务移动到 COMM_IN_PRORESS 状态。后续的读操作可能反映也可能不反映写操作的结果。为了确定性地读取写入操作的结果,客户应该等到事务状态更改为已承诺。可以通过打电话来检查这一点CommitTransaction要么DescribteTransactionAPI。单一对账单交易的读取操作也表明了同样的行为。

回滚 Amazon S3 写入

当交易被取消时,无论是自动取消还是通过致电取消Transaction,Lake Formation 永远不会删除未经您许可写入 Amazon S3 的数据。要向 Lake Formation 授予回滚交易期间写入的权限,您的代码必须调用取消 API 操作时删除对象,其中列出了在取消交易后可以删除的 Amazon S3 对象。建议你打电话给DeleteObjectsOnCancel在写之前。

这些区域有:Amazon GlueETL 库函数write_dynamic_frame.from_catalog()包括自动呼叫的选项DeleteObjectsOnCancel在写之前。在下面的示例中,callDeleteObjectsOnCancel选项包含在additional_options参数。因为价值False被传递给read_only参数start_transaction,事务不是只读事务。

transactionId = glueContext.start_transaction(False) try: datasink0 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database="MyDatabase", table_name="MyGovernedTable", additional_options={ "partitionKeys":["key1", "key2"], "transactionId":transactionId, "callDeleteObjectsOnCancel":"true" } ) glueContext.commit_transaction(transactionId) except: glueContext.cancel_transaction(transactionId)