本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用作业书签
Amazon Glue for Spark 使用作业书签来跟踪已处理的数据。有关作业书签功能及其支持的内容的摘要,请参阅 使用作业书签跟踪已处理的数据。使用书签对 Amazon Glue 作业进行编程时,您可以获得可视化作业中无法获得的灵活性。
-
从 JDBC 读取时,您可以指定要用作 Amazon Glue 脚本中书签键的列。
-
您可以选择将哪个
transformation_ctx
方法应用于每个方法调用。
始终在脚本开头调用 job.init
并在脚本结尾调用 job.commit
,并恰当配置参数。这两个函数初始化书签服务并更新服务的状态更改。如果没有调用书签,书签将无法正常工作。
指定书签键
对于 JDBC 工作流程,书签通过将关键字段的值与已添加书签的值进行比较来跟踪您的作业已读取的行。这不是必需的,也不适用于 Amazon S3 工作流程。在没有可视化编辑器的情况下编写 Amazon Glue 脚本时,您可以指定使用书签跟踪哪一列。您也可以指定多列。指定用户定义的书签键时,允许在值序列中出现间隔。
警告
如果使用用户定义的书签键,则它们每个都必须严格单调递增或递减。为复合键选择其他字段时,“次要版本”或“修订编号”等概念的字段不符合此标准,因为它们的值会在整个数据集中重复使用。
您可以通过以下方式指定 jobBookmarkKeys
和 jobBookmarkKeysSortOrder
:
-
create_dynamic_frame.from_catalog
— 使用additional_options
。 -
create_dynamic_frame.from_options
— 使用connection_options
。
转换上下文
许多 Amazon Glue PySpark 动态帧方法都包括一个名为 transformation_ctx
的可选参数,这是 ETL 运算符实例的唯一标识符。该 transformation_ctx
参数用于在作业书签中标识给定运算符的状态信息。具体来说,Amazon Glue 使用 transformation_ctx
来为书签状态键建立索引。
警告
transformation_ctx
作为键以搜索脚本中特定源的书签状态。为了使书签正常工作,您应始终让源和相关的 transformation_ctx
保持一致。更改源属性或重命名 transformation_ctx
可能会使之前的书签无效,并且基于时间戳的筛选条件可能无法产生正确的结果。
为了使作业书签正常使用,请启用作业书签参数并设置 transformation_ctx
参数。如果您未传入 transformation_ctx
参数,则不会为方法中使用的动态帧或表启用作业书签。例如,如果您有一个用于读取和连接两个 Amazon S3 源的 ETL 任务,您可能会选择仅将 transformation_ctx
参数传递给要启用书签的方法。如果您重置作业的作业书签,它会重置与作业相关联的所有转换,而不考虑所使用的 transformation_ctx
。
有关 DynamicFrameReader
类的更多信息,请参阅DynamicFrameReader 类。有关 PySpark 扩展的更多信息,请参阅 Amazon Glue PySpark 扩展参考。
示例
以下是为 Amazon S3 数据源生成的脚本的示例。使用作业书签所需的脚本部分以斜体显示。有关这些元素的更多信息,请参阅 GlueContext 类 API 和 DynamicFrameWriter 类 API。
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
以下是为 JDBC 源生成的脚本的示例。源表是一个将 empno
列作为主键的员工表。尽管默认情况下,如果未指定书签键,则任务会使用顺序主键作为书签键,但由于 empno
不一定是连续的,值中可能会有间隙,它不能作为默认书签键。因此,脚本会将 empno
显式指定为书签键。代码的此部分以斜体显示。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()