本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用作业书签跟踪已处理的数据
Amazon Glue 通过保存作业运行的状态信息来跟踪上次运行 ETL 作业期间已处理的数据。此持久状态信息称为作业书签。作业书签可帮助 Amazon Glue 维护状态信息,并可防止重新处理旧数据。有了作业书签,您可以在按照计划的时间间隔重新运行时处理新数据。作业书签包含作业的各种元素的状态,如源、转换和目标。例如,您的 ETL 任务可能会读取 Amazon S3 文件中的新分区。Amazon Glue 跟踪任务已成功处理哪些分区,以防止任务的目标数据存储中出现重复处理和重复数据。
为 JDBC 数据源、关系化转换和一些 Amazon Simple Storage Service(Amazon S3)源实施了任务书签。下表列出了 Amazon Glue 支持任务书签的 Amazon S3 源格式。
Amazon Glue 版本 | Amazon S3 源格式 |
---|---|
版本 0.9 | JSON、CSV、Apache Avro、XML |
版本 1.0 及更高版本 | JSON、CSV、Apache Avro、XML、Parquet、ORC |
有关 Amazon Glue 版本的信息,请参阅定义 Spark 作业的作业属性。
对于 JDBC 源,以下规则将适用:
-
对于每个表,Amazon Glue 使用一个或多个列作为书签键来确定新数据和处理过的数据。书签键将合并成一个复合键。
-
可以指定要用作书签键的列。如果不指定书签键,则默认情况下,Amazon Glue 将使用主键作为书签键,前提是它按顺序递增或递减(没有间隙)。
-
如果使用用户定义的书签键,则它们必须严格单调递增或递减。允许有间隙。
-
Amazon Glue 不支持使用区分大小写的列作为任务书签键。
在 Amazon Glue 中使用作业书签
在启动作业时,作业书签选项作为参数传递。下表描述了在 Amazon Glue 控制台中用于设置任务书签的选项。
作业书签 | 描述 |
---|---|
Enable | 使作业在运行后更新状态,以跟踪之前处理的数据。如果作业的源支持作业书签,它将跟踪已处理的数据,当作业运行时,它将处理自上一检查点以来的新数据。 |
禁用 | 不使用作业书签,作业始终处理整个数据集。您负责管理上一个作业运行的输出。这是默认模式。 |
Pause |
处理上次成功运行以来的增量数据或以下子选项标识的范围内的数据,无需更新最后一个书签的状态。您负责管理上一个作业运行的输出。这两个子选项是:
指定此选项集时,作业书签状态不更新。 子选项是可选的,但是使用时,需要提供两个子选项。 |
有关命令行上传递给作业的参数的详细信息(特别是关于作业书签的详细信息),请参阅Amazon Glue 作业参数。
对于 Amazon S3 输入源,Amazon Glue 任务书签将通过检查对象的上次修改时间来验证哪些对象需要重新处理。如果您的输入源数据在上次作业运行后已修改,则再次运行作业时将重新处理这些文件。
您可以将您的 Amazon Glue Spark ETL 任务的任务书签倒回之前的任何任务运行。您可以通过将您的作业书签倒回之前的任何作业运行来更好的支持数据回填场景,从而使后续作业运行只再处理已做上标签的作业运行的数据。
如果您打算使用相同的作业重新处理所有数据,请重置作业书签。要重置作业书签状态,请使用 Amazon Glue 控制台、ResetJobBookmark 操作(Python:reset_bookmark) API 操作或 Amazon CLI。例如,使用 Amazon CLI 输入以下命令:
aws glue reset-job-bookmark --job-name
my-job-name
在回放或重置书签时,Amazon Glue 不会清除目标文件,因为可能有多个目标,并且不会使用任务书签跟踪目标。仅使用任务书签跟踪源文件。在回卷和重新处理源文件时,您可以创建不同的输出目标,以避免输出中出现重复数据。
Amazon Glue 按作业跟踪作业书签。如果您删除作业,作业书签也会被删除。
有些情况下,您可能已启用 Amazon Glue 作业书签,但您的 ETL 作业还在重新处理早期运行中已处理的数据。有关解决该错误的常见原因的信息,请参阅纠正 Amazon Glue 中的错误。
转换上下文
许多Amazon Glue PySpark 动态帧方法都包含名为的可选参数transformation_ctx
,该参数是 ETL 运算符实例的唯一标识符。该 transformation_ctx
参数用于在作业书签中标识给定运算符的状态信息。具体来说,Amazon Glue 使用 transformation_ctx
来为书签状态键建立索引。
为了使作业书签正常使用,请启用作业书签参数并设置 transformation_ctx
参数。如果您未传入 transformation_ctx
参数,则不会为方法中使用的动态帧或表启用作业书签。例如,如果您有一个用于读取和连接两个 Amazon S3 源的 ETL 任务,您可能会选择仅将 transformation_ctx
参数传递给要启用书签的方法。如果您重置作业的作业书签,它会重置与作业相关联的所有转换,而不考虑所使用的 transformation_ctx
。
有关 DynamicFrameReader
类的更多信息,请参阅DynamicFrameReader 阶级。有关 PySpark 扩展的更多信息,请参阅Amazon Glue PySpark 扩展引用。
将作业书签用于 Amazon Glue 生成的脚本
本节将介绍有关使用作业书签的更多操作详情。本节还提供了一个脚本示例,当您选择源和目标并运行作业时,可从 Amazon Glue 生成该脚本。
作业书签存储作业的状态。每个状态实例的键由作业名称和版本号组成。当脚本调用 job.init
时,它将检索其状态并始终获取最新版本。在一个状态中,有多个状态元素,这些元素特定于脚本中的每个源、转换和接收器实例。这些状态元素由附加至脚本中相应元素(源、转换或接收器)的转换上下文来标识。从用户脚本中调用 job.commit
时,将自动保存状态元素。脚本将从参数中获取作业书签的作业名称和控制选项。
作业书签中的状态元素是特定于源、转换或接收器的数据。例如,假设您要从上游任务或进程不断写入数据的 Amazon S3 位置读取增量数据。在这种情况下,脚本必须确定目前已处理的内容。Amazon S3 源的任务书签实现将保存信息,这样,当任务再次运行时,它可以使用保存的信息仅筛选新对象并重新计算任务的下次运行状态。时间戳用于筛选新文件。
除了状态元素外,作业书签还有运行编号、尝试次数 和版本号。运行编号跟踪作业的运行,尝试次数记录作业运行的尝试次数。作业运行编号是随着每次成功运行单调增加的数字。尝试次数跟踪每次运行的尝试,仅在失败尝试后有运行时才增加。版本号单调增加并跟踪作业书签的更新。
在 Amazon Glue 服务数据库中,所有转换的书签状态都作为键值对存储在一起:
{ "job_name" : ..., "run_id": ..., "run_number": .., "attempt_number": ... "states": { "transformation_ctx1" : { bookmark_state1 }, "transformation_ctx2" : { bookmark_state2 } } }
transformation_ctx
作为键以搜索脚本中特定源的书签状态。为了使书签正常工作,您应始终让源和相关的 transformation_ctx
保持一致。更改源属性或重命名 transformation_ctx
可能会使之前的书签无效,并且基于时间戳的筛选条件可能无法产生正确的结果。
最佳实践
以下是通过 Amazon Glue 生成的脚本使用任务书签的最佳实践。
使脚本的开头始终包含
job.init()
,脚本的末尾始终包含job.commit()
。这两个函数用于初始化书签服务并更新服务的状态变更。如果没有调用书签,书签将无法正常工作。请勿在启用书签的情况下更改数据源属性。例如,假定 datasource0 指向 Simple Storage Service (Amazon S3) 输入路径 A,并且该任务从启用书签情况下运行数轮的源进行读取。如果您将 datasource0 的输入路径更改为 Simple Storage Service (Amazon S3) 路径 B 而未更改
transformation_ctx
,则 Amazon Glue 任务将使用存储的旧书签状态。这将导致丢失或跳过输入路径 B 中的文件,因为 Amazon Glue 将假定这些文件在以前的运行中已经处理过。将目录表和书签搭配使用以实现更好的分区管理。书签既适用于来自数据目录的数据源,也适用于来自选项的数据源。但是,使用来自选项方法很难删除/添加新的分区。将目录表和爬网程序搭配使用可以提供更好的自动化,以跟踪新添加的分区并让您通过下推谓词灵活选择特定的分区。
使用适合大型数据集的 Amazon Glue Simple Storage Service (Amazon S3 文件列表器
。书签将列出每个输入分区下的所有文件并进行筛选,因此,如果单个分区下的文件过多,书签可以运行到驱动程序 OOM 中。使用 Amazon Glue Simple Storage Service (Amazon S3) 文件列表器,以避免一次在内存中列出所有文件。
以下是为 Amazon S3 数据源生成的脚本的示例。使用作业书签所需的脚本部分以粗体和斜体显示。有关这些元素的更多信息,请参阅 GlueContext 阶级 API 和 DynamicFrameWriter 阶级 API。
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()
以下是为 JDBC 源生成的脚本的示例。源表是一个将 empno
列作为主键的员工表。尽管默认情况下,如果未指定书签键,则任务会使用顺序主键作为书签键,但由于 empno
不一定是连续的,值中可能会有间隙,它不能作为默认书签键。因此,脚本会将 empno
显式指定为书签键。代码的此部分以粗体和斜体显示。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()
您可以通过以下方式指定 jobBookmarkKeys
和 jobBookmarkKeysSortOrder
:
-
create_dynamic_frame.from_catalog
— 使用additional_options
。 -
create_dynamic_frame.from_options
— 使用connection_options
。
有关与任务书签相关的连接选项的更多信息,请参阅JDBC connectionType 值。