代码示例: 数据准备使用 ResolveChoice、Lambda和 ApplyMapping - AWS Glue
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

代码示例: 数据准备使用 ResolveChoice、Lambda和 ApplyMapping

本例中使用的数据集包括从两个 Data.CMS.gov 研究中心: 前100大诊断相关人群住院前瞻性支付系统提供者小结-2011财年),和 2011财年住院患者收费数据. 下载该数据后,我们修改了它,以在文件末尾引入了几个错误的记录。这个修改过的文件位于 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv 上的公有 Amazon S3 存储桶中。

您可以在 data_cleaning_and_lambda.py 文件 AWS Glue 示例 GitHub 存储库。

调试Python或 PySpark 脚本用于创建开发端点并在那里运行您的代码。将生成的数据写入单独的 Apache Parquet 文件以供日后分析。有关更多信息,请参阅查看开发终端节点属性

第1步: 爬取中的数据 Amazon S3 水桶

  1. 登录 AWS 管理控制台并通过以下网址打开 AWS Glue 控制台:https://console.amazonaws.cn/glue/

  2. 按照 在 AWS Glue 控制台上使用爬网程序 中描述的过程进行操作,创建新的爬网程序,它可以网络爬取 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv 文件,而且可以将生成的元数据放入 AWS Glue 数据目录中一个名为 payments 的数据库。

  3. 运行新爬网程序,然后检查 payments 数据库。在读取该文件的开头以确定其格式和分隔符之后,您应该发现爬网程序已经在数据库中创建了一个名为 medicare 的元数据表。

    medicare 表的架构如下所示:

    Column name Data type ================================================== drg definition string provider id bigint provider name string provider street address string provider city string provider state string provider zip code bigint hospital referral region description string total discharges bigint average covered charges string average total payments string average medicare payments string

第2步: 将样板脚本添加到开发端点笔记本

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库,然后设置单个 GlueContext

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())

第3步: 比较不同的框架解析

接下来,您可以查看由 Apache Spark DataFrame 识别的架构是否与您的 AWS Glue 爬网程序记录的架构相同。运行此代码:

medicare = spark.read.format( "com.databricks.spark.csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()

下面是来自 printSchema 调用的输出:

root |-- DRG Definition: string (nullable = true) |-- Provider Id: string (nullable = true) |-- Provider Name: string (nullable = true) |-- Provider Street Address: string (nullable = true) |-- Provider City: string (nullable = true) |-- Provider State: string (nullable = true) |-- Provider Zip Code: integer (nullable = true) |-- Hospital Referral Region Description: string (nullable = true) |-- Total Discharges : integer (nullable = true) |-- Average Covered Charges : string (nullable = true) |-- Average Total Payments : string (nullable = true) |-- Average Medicare Payments: string (nullable = true)

接下来,查看 AWS Glue DynamicFrame 生成的架构:

medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "payments", table_name = "medicare") medicare_dynamicframe.printSchema()

printSchema 中的输出如下所示:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

DynamicFrame 生成一个架构,在其中 provider id 可以是 longstring 类型。DataFrame 架构将 Provider Id 列为 string 类型,数据目录 将 provider id 列为 bigint 类型。

哪一个是正确的? 文件末尾有两条记录 (共计 16 万条记录),该列中有 string 值。这些是为说明问题而引入的错误记录。

为解决此类问题,AWS Glue DynamicFrame 引入了 choice 类型的概念。在这种情况下,DynamicFrame 显示 longstring 值都出现在该列中。AWS Glue 爬网程序错过了 string 值,因为它仅被视为数据的一个 2 MB 前缀。Apache闪耀 DataFrame 考虑整个数据集,但强制将最常规的类型分配给列,即 string。事实上,Spark经常会选择最普遍的案例,因为在这种案例中,它并不熟悉的是复杂的类型或变异。

要查询 provider id 列,请先解析选择类型。您可以在 DynamicFrame 中使用 resolveChoice 转换方法,通过 cast:long 选项将这些 string 值转换为 long 值:

medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')]) medicare_res.printSchema()

printSchema 输出现在是:

root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

其中,该值是无法强制转换的 string,AWS Glue 插入了一个 null

另一个选项是将选择类型转换为一个 struct,以保持两种类型的值。

接下来,查看异常的行:

medicare_res.toDF().where("'provider id' is NULL").show()

您看到以下内容:

+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ | drg definition|provider id| provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+ |948 - SIGNS & SYM...| null| INC| 1050 DIVISION ST| MAUSTON| WI| 53948| WI - Madison| 12| $11961.41| $4619.00| $3775.33| |948 - SIGNS & SYM...| null| INC- ST JOSEPH| 5000 W CHAMBERS ST| MILWAUKEE| WI| 53210| WI - Milwaukee| 14| $10514.28| $5562.50| $4522.78| +--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+

现在,删除两个格式错误的记录,如下所示:

medicare_dataframe = medicare_res.toDF() medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")

第4步: 映射数据和使用ApacheSparkLambda功能

AWS Glue 尚未直接支持 Lambda 函数,也称为用户定义函数。但是,您始终可以将 DynamicFrame 和 Apache Spark DataFrame 相互转换,以便除 DynamicFrames 的特殊功能外,还能利用 Spark 功能。

接下来,将付款信息转化为数字,以便 Amazon Redshift 或 Amazon Athena 这样的分析引擎可以更快地进行数字处理:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType chop_f = udf(lambda x: x[1:], StringType()) medicare_dataframe = medicare_dataframe.withColumn( "ACC", chop_f( medicare_dataframe["average covered charges"])).withColumn( "ATP", chop_f( medicare_dataframe["average total payments"])).withColumn( "AMP", chop_f( medicare_dataframe["average medicare payments"])) medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()

show 调用中的输出如下所示:

+--------+-------+-------+ | ACC| ATP| AMP| +--------+-------+-------+ |32963.07|5777.24|4763.73| |15131.85|5787.57|4976.71| |37560.37|5434.95|4453.79| |13998.28|5417.56|4129.16| |31633.27|5658.33|4851.44| |16920.79|6653.80|5374.14| |11977.13|5834.74|4761.41| |35841.09|8031.12|5858.50| |28523.39|6113.38|5228.40| |75233.38|5541.05|4386.94| |67327.92|5461.57|4493.57| |39607.28|5356.28|4408.20| |22862.23|5374.65|4186.02| |31110.85|5366.23|4376.23| |25411.33|5282.93|4383.73| | 9234.51|5676.55|4509.11| |15895.85|5930.11|3972.85| |19721.16|6192.54|5179.38| |10710.88|4968.00|3898.88| |51343.75|5996.00|4962.45| +--------+-------+-------+ only showing top 20 rows

这些仍然是数据中的字符串。我们可以使用强大的 apply_mapping 转换方法来删除、重命名、转换和嵌套数据,以便其他数据编程语言和系统可以轻松地访问它:

from awsglue.dynamicframe import DynamicFrame medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested") medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'), ('provider id', 'long', 'provider.id', 'long'), ('provider name', 'string', 'provider.name', 'string'), ('provider city', 'string', 'provider.city', 'string'), ('provider state', 'string', 'provider.state', 'string'), ('provider zip code', 'long', 'provider.zip', 'long'), ('hospital referral region description', 'string','rr', 'string'), ('ACC', 'string', 'charges.covered', 'double'), ('ATP', 'string', 'charges.total_pay', 'double'), ('AMP', 'string', 'charges.medicare_pay', 'double')]) medicare_nest_dyf.printSchema()

printSchema 输出如下所示:

root |-- drg: string |-- provider: struct | |-- id: long | |-- name: string | |-- city: string | |-- state: string | |-- zip: long |-- rr: string |-- charges: struct | |-- covered: double | |-- total_pay: double | |-- medicare_pay: double

将数据重新变成 Spark DataFrame 后,您可以显示它现在的外观:

medicare_nest_dyf.toDF().show()

您可以在一个 (扩展) 代码行中执行所有这些操作:

+--------------------+--------------------+---------------+--------------------+ | drg| provider| rr| charges| +--------------------+--------------------+---------------+--------------------+ |039 - EXTRACRANIA...|[10001,SOUTHEAST ...| AL - Dothan|[32963.07,5777.24...| |039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...| |039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...| |039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...| |039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...| |039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...| |039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...| |039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...| |039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...| |039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...| |039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...| |039 - EXTRACRANIA...|[10055,FLOWERS HO...| AL - Dothan|[39607.28,5356.28...| |039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...| |039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...| |039 - EXTRACRANIA...|[10083,SOUTH BALD...| AL - Mobile|[25411.33,5282.93...| |039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...| |039 - EXTRACRANIA...|[10090,PROVIDENCE...| AL - Mobile|[15895.85,5930.11...| |039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...| |039 - EXTRACRANIA...|[10100,THOMAS HOS...| AL - Mobile|[10710.88,4968.0,...| |039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...| +--------------------+--------------------+---------------+--------------------+ only showing top 20 rows

第5步: 将数据写入ApacheParquet

有了 AWS Glue,可以很容易地以诸如 Apache Parquet 这样的格式编写数据,以便关系数据库可以有效地使用它:

glueContext.write_dynamic_frame.from_options( frame = medicare_nest_dyf, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")