使用作业书签来跟踪已处理的数据 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

使用作业书签来跟踪已处理的数据

AWS Glue 通过保存作业运行的状态信息来跟踪上次运行 ETL 作业期间已处理的数据。此持久状态信息称为作业书签。作业书签可帮助 AWS Glue 维护状态信息,并可防止重新处理旧数据。有了作业书签,您可以在按照计划的时间间隔重新运行时处理新数据。作业书签包含作业的各种元素的状态,如源、转换和目标。例如,您的 ETL 作业可能会读取 Amazon S3 文件中的新分区。AWS Glue 跟踪作业已成功处理哪些分区,以防止作业的目标数据存储中出现重复处理和重复数据。

为 JDBC 数据源、关系化转换和一些 Amazon Simple Storage Service (Amazon S3) 源实施了作业书签。下表列出了 AWS Glue 支持作业书签的 Amazon S3 源格式。

AWS Glue version Amazon S3 源格式
版本 0.9 JSON、CSV、Apache Avro、XML
版本 1.0 及更高版本 JSON、CSV、Apache Avro、XML、Parquet、ORC

有关 Glue 版本的信息,请参阅 定义作业属性

对于 JDBC 源,以下规则将适用:

  • 对于每个表,AWS Glue 使用一个或多个列作为书签键来确定新数据和处理过的数据。书签键将合并成一个复合键。

  • 可以指定要用作书签键的列。如果不指定书签键,则默认情况下,AWS Glue 将使用主键作为书签键,前提是它按顺序递增或递减(没有间隙)。

  • 如果使用用户定义的书签键,则它们必须严格单调递增或递减。允许有间隙。

在 AWS Glue 中使用作业书签

在启动作业时,作业书签选项作为参数传递。下表描述了在 AWS Glue 控制台中用于设置作业书签的选项。

作业书签 Description
启用 。 使作业在运行后更新状态,以跟踪之前处理的数据。如果作业的源支持作业书签,它将跟踪已处理的数据,当作业运行时,它将处理自上一检查点以来的新数据。
禁用 不使用作业书签,作业始终处理整个数据集。您负责管理上一个作业运行的输出。(这是默认值。)
Pause

处理上次成功运行以来的增量数据或以下子选项标识的范围内的数据,无需更新最后一个书签的状态。您负责管理上一个作业运行的输出。这两个子选项是:

  • job-bookmark-from <from-value> 是运行 ID,它表示直到最后一次成功运行之前处理的所有输入,包括指定的运行 ID。对应的输入将被忽略。

  • job-bookmark-to <to-value> 是运行 ID,它表示直到最后一次成功运行之前处理的所有输入,包括指定的运行 ID。由作业处理对应的输入,不包括 <from-value> 标识的输入。任何晚于此输入的输入也会被排除在外,不进行处理。

指定此选项集时,作业书签状态不更新。

子选项是可选的,但是使用时,需要提供两个子选项。

有关命令行上传递给作业的参数的详细信息(特别是关于作业书签的详细信息),请参阅由 使用的特殊参数AWS Glue

对于 Amazon S3 输入源,AWS Glue 作业书签将通过检查对象的上次修改时间来验证哪些对象需要重新处理。如果您的输入源数据在上次作业运行后已修改,则再次运行作业时将重新处理这些文件。

您可以将您的 Glue Spark ETL 作业的作业书签倒回之前的任何作业运行。您可以通过将您的作业书签倒回之前的任何作业运行来更好的支持数据回填场景,从而使后续作业运行只再处理已做上标签的作业运行的数据。

如果您打算使用相同的作业重新处理所有数据,请重置作业书签。要重置作业书签状态,请使用 AWS Glue 控制台、ResetJobBookmark 操作 (Python:reset_job_bookmark) API 操作或 AWS CLI。例如,使用 AWS CLI 输入以下命令:

aws glue reset-job-bookmark --job-name my-job-name

当您回卷或重置书签时, AWS Glue 不清洁目标文件,因为可能存在多个目标和目标,因为工作书签没有跟踪。只有工作书签跟踪源文件。在重新绕组和重新处理源文件时,可以创建不同的输出目标,以避免输出中的重复数据。

AWS Glue 按作业跟踪作业书签。如果您删除作业,作业书签也会被删除。

有些情况下,您可能已启用 AWS Glue 作业书签,但您的 ETL 作业还在重新处理早期运行中已处理的数据。有关解决该错误的常见原因的信息,请参阅纠正 AWS Glue 中的错误

转换上下文

许多 AWS Glue PySpark 动态帧方法都包括一个名为 transformation_ctx 的可选参数,这是 ETL 运算符实例的唯一标识符。该 transformation_ctx 参数用于在作业书签中标识给定运算符的状态信息。具体来说,AWS Glue 使用 transformation_ctx 来为书签状态键建立索引。

为了使作业书签正常使用,请启用作业书签参数并设置 transformation_ctx 参数。如果您未传入 transformation_ctx 参数,则不会为方法中使用的动态帧或表启用作业书签。例如,如果您有一个用于读取和连接两个 Amazon S3 源的 ETL 作业,您可能会选择仅将 transformation_ctx 参数传递给要启用书签的方法。如果您重置作业的作业书签,它会重置与作业相关联的所有转换,而不考虑所使用的 transformation_ctx

有关 DynamicFrameReader 类的更多信息,请参阅DynamicFrameReader 类。有关 PySpark 扩展的更多信息,请参阅 AWS Glue PySpark 扩展参考

将作业书签用于 AWS Glue 生成的脚本

本节将介绍有关使用作业书签的更多操作详情。本节还提供了一个脚本示例,当您选择源和目标并运行作业时,可从 AWS Glue 生成该脚本。

作业书签存储作业的状态。每个状态实例的键由作业名称和版本号组成。当脚本调用 job.init 时,它将检索其状态并始终获取最新版本。在一个状态中,有多个状态元素,这些元素特定于脚本中的每个源、转换和接收器实例。这些状态元素由附加至脚本中相应元素(源、转换或接收器)的转换上下文来标识。从用户脚本中调用 job.commit 时,将自动保存状态元素。脚本将从参数中获取作业书签的作业名称和控制选项。

作业书签中的状态元素是特定于源、转换或接收器的数据。例如,假设您要从上游作业或进程不断写入数据的 Amazon S3 位置读取增量数据。在这种情况下,脚本必须确定目前已处理的内容。Amazon S3 源的作业书签实现将保存信息,这样,当作业再次运行时,它可以使用保存的信息仅筛选新对象并重新计算作业的下次运行状态。时间戳用于筛选新文件。

除了状态元素外,作业书签还有运行编号尝试次数版本号。运行编号跟踪作业的运行,尝试次数记录作业运行的尝试次数。作业运行编号是随着每次成功运行单调增加的数字。尝试次数跟踪每次运行的尝试,仅在失败尝试后有运行时才增加。版本号单调增加并跟踪作业书签的更新。

以下是为 Amazon S3 数据源生成的脚本的示例。使用作业书签所需的脚本部分以粗体和斜体显示。有关这些元素的更多信息,请参阅 GlueContext 类 API 和 DynamicFrameWriter 类 API。

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

以下是为 JDBC 源生成的脚本的示例。源表是一个将 empno 列作为主键的员工表。尽管默认情况下,如果未指定书签键,则作业会使用顺序主键作为书签键,但由于 empno 不一定是顺序键—值中可能会有间隙—它不能作为默认书签键。因此,脚本会将 empno 显式指定为书签键。代码的此部分以粗体和斜体显示。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarksKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

有关作业书签相关连接选项的更多信息,请参阅 JDBC connectionType Values.

使用修改时间戳跟踪文件

对于 Amazon S3 输入源,AWS Glue 作业书签将通过检查文件的上次修改时间来验证哪些对象需要重新处理。

考虑以下 示例。在此图中,X 轴是时间轴,方向从左到右,最左侧的点为 T0。Y 轴是在时间 T 观测到的文件的列表。表示列表的元素根据其修改时间放置在图形中。


        在时间 T 观测到的文件的列表,基于其修改时间。

在本示例中,当作业在修改时间戳 1 (T1) 开始时,它会查找修改时间大于 T0 且小于或等于 T1 的文件。这些文件为 F2、F3、F4 和 F5。作业书签分别将时间戳 T0 和 T1 存储为低和高时间戳。

当作业在 T2 重新运行时,它将筛选修改时间大于 T1 且小于等于 T2 的文件。这些文件为 F7、F8、F9 和 F10。因此缺少文件 F3'、F4' 和 F5'。由于 Amazon S3 列表一致性,修改时间小于或等于 T1 的文件 F3'、F4' 和 F5' 会在 T1 后显示。

为考虑 Amazon S3 最终一致性,AWS Glue 在作业书签中包括文件列表(或路径哈希)。AWS Glue 假定 Amazon S3 文件列表仅在当前时间之前的有限时间段内 (dt) 不一致。也就是说,在 T1 执行列出时,修改时间在 T1 – dt 与 T1 之间的文件的文件列表不一致。但是,修改时间小于或等于 T1 – d1 的文件的列表在大于或等于 T1 的时间一致。

您指定一个时间段,在这段时间内,AWS Glue 将使用 AWS Glue 连接选项中的 MaxBand 选项保存文件(以及文件可能保持一致的位置)。默认值为 900 秒 (15 分钟)。有关此属性的更多信息,请参阅Connection Types and Options for ETL in AWS Glue

当作业在时间戳 2 (T2) 重新运行时,它会列出以下范围的文件:

  • T1 - dt(不含)至 T1(含)。此列表包括 F4、F5、F4' 和 F5'。此列表是个一致的范围。但是,此范围对于 T1 的列表不一致,并且已保存文件 F3、F4 和 F5 的列表。为获取要在 T2 处理的文件,将删除文件 F3、F4 和 F5。

  • T1(不含)至 T2 - dt(含)。此列表包括 F7 和 F8。此列表是个一致的范围。

  • T2 - dt(不含)- T2(含)。此列表包括 F9 和 F10。此列表是个不一致的范围。

生成的文件列表是 F3'、F4'、F5'、F7、F8、F9 和 F10。

不一致列表中的新文件为 F9 和 F10,这些文件将保存在下次运行的筛选器中。

有关 Amazon S3 最终一致性,请参阅 AmazonS3简介Amazon Simple Storage Service 开发人员指南.

作业运行失败

作业失败时,作业运行版本将递增。例如,如果在时间戳 1 (T1) 处的作业运行失败,并且该作业在 T2 处重新运行,它会将高时间戳推进至 T2。然后,当作业在后面的点 T3 运行时,它会将高时间戳推进至 T3。

如果作业在 job.commit()(在 T1 处)之前运行失败,将在后续运行中处理文件,其中 AWS Glue 将处理 T0 至 T2 之间的文件。