教程:编写 Amazon Glue for Spark 脚本 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

教程:编写 Amazon Glue for Spark 脚本

本教程向您介绍编写 Amazon Glue 脚本的过程。脚本可以与作业一起按计划运行,也可以与交互式会话交互运行。有关作业的更多信息,请参阅使用 Amazon Glue Studio 构建可视化 ETL 作业。有关交互式会话的更多信息,请参阅 Amazon Glue 交互式会话概览

Amazon Glue Studio 可视化编辑器提供图形化的零代码接口,用于构建 Amazon Glue 作业。AmazonGlue 脚本支持可视化作业。它们可让您访问用于使用 Apache Spark 程序的扩展工具集。您可以访问本机 Spark API,以及从 Amazon Glue 脚本访问为提取、转换、加载 (ETL) 工作流提供便利的 Amazon Glue 库。

在本教程中,您将提取、转换和加载停车罚单数据集。完成此工作的脚本在形式和功能上与 Amazon 大数据博客上的使用 Amazon Glue Studio 让 ETL 更轻松中生成的脚本完全相同,该博客介绍了 Amazon Glue Studio 可视化编辑器。通过在作业中运行此脚本,您可以将其与可视化作业进行比较,并了解 Amazon Glue ETL 脚本的工作原理。这可以让您为使用可视化作业中尚不可用的其他功能做好准备。

在本教程中,您使用 Python 语言和库。Scala 中提供了类似的功能。通读本教程后,您应该能够生成和检查示例 Scala 脚本,以了解如何进行 Scala Amazon Glue ETL 脚本编写过程。

先决条件

本教程包含以下先决条件:

  • 与 Amazon Glue Studio 博客文章相同的先决条件,该博客文章指示您运行 Amazon CloudFormation 模板。

    此模板使用 Amazon Glue 数据目录来管理 s3://aws-bigdata-blog/artifacts/gluestudio/ 中可用的停车罚单数据集。它创建了以下将引用的资源:

  • Amazon Glue Studio角色 - 要运行 Amazon Glue 任务的 IAM 角色

  • Amazon Glue StudioAmazon S3 存储桶 - 用于存储与博客相关的文件的 Amazon S3 存储桶的名称

  • Amazon Glue StudioTicketsYYZDB – Amazon Glue 数据目录数据库

  • Amazon Glue StudioTableTickets – 要用作源的数据目录表

  • Amazon Glue StudioTableTrials – 要用作源的数据目录表

  • Amazon Glue StudioParkingTicketCount – 要用作目标的数据目录表

  • 在 Amazon Glue Studio 博客文章中生成的脚本。如果博客文章发生更改,脚本也可以在以下文本中使用。

生成示例脚本

您可以使用 Amazon Glue Studio 可视化编辑器作为强大的代码生成工具,为您要编写的脚本创建脚手架。您将使用此工具创建示例脚本。

如果您想跳过这些步骤,教程提供了此脚本。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node S3 bucket S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://DOC-EXAMPLE-BUCKET", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", ) job.commit()
生成示例脚本
  1. 完成 Amazon Glue Studio 教程。要完成此教程,请参阅 Creating a job in Amazon Glue Studio from an example job

  2. 导航到作业页面上的 Script(脚本)选项卡,如以下屏幕截图所示:

    
                            Amazon Glue 作业的 Script(脚本)选项卡。
  3. 复制 Script(脚本)选项卡的完整内容。通过在 Job details(作业详细信息)中设置脚本语言,您可以在生成 Python 或 Scala 代码之间来回切换。

第 1 步。创建作业并粘贴您的脚本

在此步骤中,您将在 Amazon Web Services Management Console 中创建一个 Amazon Glue 作业。这将设置允许 Amazon Glue 运行脚本的配置。它还为您创建了一个存储和编辑脚本的位置。

创建作业
  1. 在 Amazon Web Services Management Console 中,导航到 Amazon Glue 登录页面。

  2. 在侧面的导航窗格中,选择 Jobs(作业)。

  3. 选择 Create job(创建作业)中的 Spark script editor(Spark 脚本编辑器),然后选择 Create(创建)。

  4. 可选 - 将脚本的全文粘贴到 Script(脚本)窗格中。或者,您可以按照教程进行操作。

第 2 步。导入 Amazon Glue 库

您需要将脚本设置为与脚本外部定义的代码和配置进行交互。此工作在 Amazon Glue Studio 的后台完成。

在此步骤中,您执行以下操作。

  • 导入并初始化 GlueContext 对象。从脚本编写的角度来看,这是最重要的导入。这公开了定义源数据集和目标数据集的标准方法,是所有 ETL 脚本的起点。要了解有关 GlueContext 类的更多信息,请参阅 GlueContext 类

  • 初始化 SparkContextSparkSession。这些可让您配置 Amazon Glue 作业内可用的 Spark 引擎。您无需在介绍性 Amazon Glue 脚本内直接使用它们。

  • 调用 getResolvedOptions 以准备在脚本中使用的作业参数。有关解析作业参数的更多信息,请参阅 使用 getResolvedOptions 访问参数

  • 初始化 JobJob 对象设置配置并跟踪各种可选 Amazon Glue 功能的状态。脚本可在没有 Job 对象的情况下运行,但最佳实践是对其进行初始化,以便以后集成这些功能时不会混淆。

    其中一项功能是您可以选择在本教程中进行配置的作业书签。您可以在下一节 可选 - 启用作业书签 中了解作业书签。

在此过程中,您将编写以下代码。此代码是生成的示例脚本的一部分。

from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args)
导入 Amazon Glue 库
  • 复制此部分的代码并将其粘贴到 Script(脚本)编辑器中。

    注意

    您可能会认为复制代码是一种糟糕的工程实践。在本教程中,我们建议这样做,以便您在所有 Amazon Glue ETL 脚本中一致地命名核心变量。

第 3 步。从源提取数据

在任何 ETL 过程中,您首先需要定义要更改的源数据集。在 Amazon Glue Studio 可视化编辑器中,您可通过创建 Source(源)节点提供此信息。

在此步骤中,您为 create_dynamic_frame.from_catalog 方法提供 databasetable_name,以从 Amazon Glue 数据目录中配置的源提取数据。

在上一步中,您初始化了 GlueContext 对象。可使用此对象查找用于配置源的方法,例如 create_dynamic_frame.from_catalog

在此过程中,您将使用 create_dynamic_frame.from_catalog 编写以下代码。此代码是生成的示例脚本的一部分。

S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog( database="yyz-tickets", table_name="tickets", transformation_ctx="S3bucket_node1" )
从源提取数据
  1. 查看文档以在 GlueContext 中查找方法,用于从 Amazon Glue Data Catalog 中定义的源提取数据。这些方法记录在 GlueContext 类 中。选择 create_dynamic_frame.from_catalog 方法。在 glueContext 上调用此方法。

  2. 查看 create_dynamic_frame.from_catalog 的文档。此方法需要 databasetable_name 参数。为 create_dynamic_frame.from_catalog 提供必要的参数。

    Amazon Glue Data Catalog 存储有关源数据位置和格式的信息,并在先决条件部分中设置。您不必直接向脚本提供该信息。

  3. 可选 - 为方法提供 transformation_ctx 参数,以支持作业书签。您可以在下一节 可选 - 启用作业书签 中了解作业书签。

注意

提取数据的常用方法

create_dynamic_frame_from_catalog 用于连接 Amazon Glue Data Catalog 中的表。

如果需要直接为作业提供描述源结构和位置的配置,请参阅 create_dynamic_frame_from_options 方法。与使用 create_dynamic_frame.from_catalog 时相比,您需要提供更详细的参数来描述数据。

请参阅有关 format_optionsconnection_parameters 的补充文档以确定所需参数。有关如何提供源数据格式相关脚本信息的说明,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项。有关如何提供源数据位置相关脚本信息的说明,请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项

如果从流式传输源读取信息,您可以通过 create_data_frame_from_catalogcreate_data_frame_from_options 方法为作业提供源信息。请注意,这些方法会返回 Apache Spark DataFrames

我们生成的代码将调用 create_dynamic_frame.from_catalog,同时参考文档指向 create_dynamic_frame_from_catalog。这些方法最终调用相同的代码,并且包含在内以编写更纯净的代码。您可以通过查看 Python 包装器的源代码来验证这一点,该源代码位于 aws-glue-libs

第 4 步。使用 Amazon Glue 转换数据

在 ETL 过程中提取源数据后,您需要描述想要如何更改数据。您可通过在 Amazon Glue Studio 可视化编辑器中创建 Transform(转换)节点来提供此信息。

在此步骤中,为 ApplyMapping 方法提供当前和所需字段名称和类型的映射以转换 DynamicFrame

您可执行以下转换。

  • 删除四个 locationprovince 键。

  • officer 的名称更改为 officer_name

  • ticket_numberset_fine_amount 的类型更改为 float

create_dynamic_frame.from_catalog 为您提供一个 DynamicFrame 对象。DynamicFrame 表示 Amazon Glue 中的数据集。AmazonGlue 转换是更改 DynamicFrames 的操作。

注意

什么是 DynamicFrame

DynamicFrame 是一个抽象概念,可让您将数据集与数据中条目的名称和类型的描述联系起来。在 Apache Spark 中,存在一个类似的抽象概念,称为 DataFrame。有关 DataFrames 的说明,请参阅《Spark SQL 指南》。

借助 DynamicFrames,您可以动态描述数据集架构。考虑一个具有价格列的数据集,其中一些条目将价格存储为字符串,而另一些条目将价格存储为双精度。AmazonGlue 动态计算架构 — 它为每一行创建自描述记录。

不一致的字段(如价格)在框架的架构中用类型(ChoiceType)显式表示。您可以解决不一致的字段,方法是使用 DropFields 删除或使用 ResolveChoice 解析这些字段。这些是 DynamicFrame 上可用的转换。然后,您可以使用 writeDynamicFrame 将数据写回数据湖。

您可以从 DynamicFrame 类的方法中调用许多相同的转换,从而得到更易读的脚本。有关 DynamicFrame 的更多信息,请参阅 DynamicFrame 类

在此过程中,您将使用 ApplyMapping 编写以下代码。此代码是生成的示例脚本的一部分。

ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[ ("tag_number_masked", "string", "tag_number_masked", "string"), ("date_of_infraction", "string", "date_of_infraction", "string"), ("ticket_date", "string", "ticket_date", "string"), ("ticket_number", "decimal", "ticket_number", "float"), ("officer", "decimal", "officer_name", "decimal"), ("infraction_code", "decimal", "infraction_code", "decimal"), ("infraction_description", "string", "infraction_description", "string"), ("set_fine_amount", "decimal", "set_fine_amount", "float"), ("time_of_infraction", "decimal", "time_of_infraction", "decimal"), ], transformation_ctx="ApplyMapping_node2", )
使用 Amazon Glue 转换数据
  1. 查看文档以确定用于更改和删除字段的转换。有关详细信息,请参阅GlueTransform 基类。选择 ApplyMapping 转换。有关 ApplyMapping 的更多信息,请参阅 ApplyMapping 类。对 ApplyMapping 转换对象调用 apply

    注意

    什么是 ApplyMapping

    ApplyMapping 采用 DynamicFrame 并对其进行转换。它需要一个表示字段转换的元组列表,即“映射”。前两个元组元素(字段名称和类型)用于标识框架中的字段。后两个参数也是字段名称和类型。

    ApplyMapping 将源字段转换为新 DynamicFrame 中的目标名称和类型,然后将其返回。未提供的字段在返回值中删除。

    您可以使用 DynamicFrame 上的 apply_mapping 方法调用相同的转换,而不是调用 apply,以创建更流畅、可读的代码。有关更多信息,请参阅 apply_mapping

  2. 查看 ApplyMapping 的文档以确定所需的参数。请参阅 ApplyMapping 类。您会发现此方法需要 framemappings 参数。为 ApplyMapping 提供必要的参数。

  3. 可选 - 为方法提供 transformation_ctx 以支持作业书签。您可以在下一节 可选 - 启用作业书签 中了解作业书签。

注意

Apache Spark 功能

我们提供转换,以简化作业中的 ETL 工作流。您还可以访问作业中 Spark 程序提供的、为更通用的目的而构建的库。为使用这些库,您将在 DynamicFrameDataFrame 之间转换。

您可以使用 toDF 创建 DataFrame。您可以使用 DataFrame 上可用的方法来转换数据集。有关这些方法的更多信息,请参阅 DataFrame。然后,可以用 fromDF 向后转换,以使用 Amazon Glue 操作将帧加载到目标。

第 5 步。将数据加载到目标中

转换数据后,通常将转换后的数据存储在与源不同的位置。您可以通过在 Amazon Glue Studio 可视化编辑器中创建 target(目标)节点来执行此操作。

在此步骤中,您将为 write_dynamic_frame.from_options 方法提供 connection_typeconnection_optionsformatformat_options 以将数据加载到 Amazon S3 中的目标桶中。

在步骤 1 中,您初始化了 GlueContext 对象。在 Amazon Glue 中,您将在此找到用于配置目标的方法,就像源一样。

在此过程中,您将使用 write_dynamic_frame.from_options 编写以下代码。此代码是生成的示例脚本的一部分。

S3bucket_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="glueparquet", connection_options={"path": "s3://DOC-EXAMPLE-BUCKET", "partitionKeys": []}, format_options={"compression": "gzip"}, transformation_ctx="S3bucket_node3", )
将数据加载到目标中
  1. 查看文档,找到将数据加载到目标 Amazon S3 桶中的方法。这些方法记录在 GlueContext 类 中。选择 write_dynamic_frame_from_options 方法。在 glueContext 上调用此方法。

    注意

    加载数据的常用方法

    write_dynamic_frame.from_options 是用于加载数据的最常用方法。它支持 Amazon Glue 中可用的所有目标。

    如果要写入到 Amazon Glue 连接中定义的 JDBC 目标,请使用 write_dynamic_frame_from_jdbc_conf 方法。AmazonGlue 连接会存储有关如何连接到数据来源的信息。这样就无需在 connection_options 中提供该信息。但是,您仍需要使用 connection_options 来提供 dbtable

    write_dynamic_frame.from_catalog 并非加载数据的常用方法。此方法在不更新基础数据集的情况下更新 Amazon Glue Data Catalog,并与更改基础数据集的其他过程结合使用。有关更多信息,请参阅 在数据目录中通过 Amazon Glue ETL 任务创建表、更新架构和添加新分区

  2. 查看 write_dynamic_frame_from_options 的文档。此方法需要 frameconnection_typeformatconnection_optionsformat_options。在 glueContext 上调用此方法。

    1. 请参阅有关 format_optionsformat 的补充文档以确定您需要的参数。有关数据格式说明,请参阅 Amazon Glue for Spark 中的输入和输出的数据格式选项

    2. 请参阅有关 connection_typeconnection_options 的补充文档以确定您需要的参数。有关连接说明,请参阅 Amazon Glue for Spark 中适用于 ETL 的连接类型和选项

    3. write_dynamic_frame.from_options 提供必要的参数。此方法与 create_dynamic_frame.from_options 的配置类似。

  3. 可选 - 为 transformation_ctx 提供 write_dynamic_frame.from_options 以支持作业书签。您可以在下一节 可选 - 启用作业书签 中了解作业书签。

第 6 步。提交 Job 对象

您在步骤 1 中初始化了 Job 对象。您需要在脚本结束时手动结束其生命周期。某些可选功能需要此操作才能正常运行。此工作在 Amazon Glue Studio 的后台完成。

在此步骤中,调用 commit 对象上的 Job 方法。

在此过程中,您将编写以下代码。此代码是生成的示例脚本的一部分。

job.commit()
提交 Job 对象
  1. 如果尚未完成,请执行前面部分中概述的可选步骤以包含 transformation_ctx

  2. 调用 commit

可选 - 启用作业书签

在之前的每个步骤中,都会指示您设置 transformation_ctx 参数。这与称为作业书签的功能有关。

借助作业书签,您可以使用重复运行的作业节省时间和金钱,而不是使用可以轻松跟踪以前工作的数据集。作业书签跟踪 Amazon Glue 转换在之前运行的数据集上的进度。通过跟踪之前运行的结束位置,Amazon Glue 可以将其工作限制为之前未处理的行。有关作业书签的更多信息,请参阅 使用作业书签跟踪已处理的数据

要启用作业书签,请先将 transformation_ctx 添加到提供的函数中,如前面的示例所述。作业书签状态在运行期间保持不变。transformation_ctx 参数是用于访问该状态的键。这些语句本身没有任何作用。您还需要激活作业配置中的功能。

在此过程中,您可以使用 Amazon Web Services Management Console 启用作业书签。

启用作业书签
  1. 导航到相应作业的 Job details(作业详细信息)部分。

  2. Job bookmark(作业书签)设置为 Enable(启用)。

第 7 步。将代码作为作业运行

在此步骤中,运行作业以验证是否已成功完成本教程。这可以通过单击按钮完成,就像在 Amazon Glue Studio 可视化编辑器中一样。

将代码作为作业运行
  1. 选择标题栏上的 Untitled job(未命名作业)以编辑和设置作业名称。

  2. 导航到 Job details(作业详细信息)选项卡。为作业分配 IAM Role(IAM 角色)。您可以使用通过 Amazon Glue Studio 教程先决条件中的 Amazon CloudFormation 模板创建的角色。如果已完成该教程,则角色应作为 AWS Glue StudioRole 提供。

  3. 选择 Save(保存)以保存您的脚本。

  4. 选择 Run(运行)以运行您的作业。

  5. 导航到 Runs(运行)选项卡以验证作业完成。

  6. 导航到 DOC-EXAMPLE-BUCKETwrite_dynamic_frame.from_options 的目标)。确认输出符合您的预期。

有关配置和管理作业的更多信息,请查阅 提供您自己的自定义脚本

更多信息

Apache Spark 库和方法在 Amazon Glue 脚本中提供。您可以查看 Spark 文档以了解如何使用这些随附的库。有关更多信息,请参阅 Spark 源存储库的示例部分

Amazon Glue 2.0+ 默认包含多个常用的 Python 库。还有一些机制可以将您自己的依赖项加载到 Scala 或 Python 环境的 Amazon Glue 作业中。有关 Python 依赖项的信息,请参阅 将 Python 库与 Amazon Glue 一起使用

有关如何在 Python 中使用 Amazon Glue 功能的更多示例,请参阅 Amazon Glue Python 代码示例。Scala 和 Python 作业具有同等的功能,因此我们的 Python 示例应该会让您了解如何在 Scala 中进行类似的工作。