在 AWS Glue 中编辑脚本 - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

如果我们为英文版本指南提供翻译,那么如果存在任何冲突,将以英文版本指南为准。在提供翻译时使用机器翻译。

在 AWS Glue 中编辑脚本

脚本包含从源中提取数据、转换数据并将数据加载到目标中的代码。AWS Glue 在启动作业时运行脚本。

AWS Glue ETL 脚本可使用 Python 或 Scala 编码。Python 脚本使用 PySpark Python 方言的扩展语言来运行提取、转换和加载 (ETL) 作业。脚本包含扩展构造,用于处理 ETL 转换。当您为作业自动生成源代码逻辑时,会创建脚本。您可以编辑此脚本,也可以提供自己的脚本来处理您的 ETL 作业。

有关如何使用 AWS Glue控制台定义和编辑脚本的信息,请参阅在 AWS Glue 控制台上处理脚本

定义脚本

给定源和目标,AWS Glue 可以生成转换数据的脚本。此建议脚本是填充您的源和目标的初始版本,建议在 PySpark 中进行转换。您可以验证并修改脚本以满足您的业务需求。在 AWS Glue 中使用脚本编辑器添加指定源和目标的参数,以及运行脚本所需的任何其他参数。脚本随作业运行,作业通过触发器启动,可基于某个计划或事件。有关触发器的更多信息,请参阅使用触发器启动作业和爬网程序

在 AWS Glue 控制台中,脚本表示为代码。您还可以将脚本作为图表进行查看,在脚本中嵌入注释 (##)。 这些注释描述用于在 AWS Glue 控制台中生成图表的脚本的形参、转换类型、实参、输入和其他特征。

脚本图表显示以下内容:

  • 脚本的源输入

  • 转换

  • 脚本编写的目标输出

脚本可以包含以下注释:

注释 用量
@params ETL 作业中脚本需要的参数。
@type 图表中节点的类型,如转换类型、数据源或数据接收器。
@args 传递到节点的参数,输入数据引用除外。
@return 从脚本返回的变量。
@inputs 节点的数据输入。

要了解脚本内代码构造的信息,请参阅在 Python 中编写 AWS Glue ETL 脚本

以下是由 AWS Glue 生成的脚本示例。该脚本用于将简单数据集从一个 Amazon Simple Storage Service (Amazon S3) 位置复制到另一个位置的作业,并会将格式从 CSV 更改为 JSON。在一些初始化代码之后,脚本包含指定数据源、映射和目标(数据接收器)的命令。另请注意注释。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2") job.commit()