Amazon Glue Studio 笔记本中 ETL 作业的数据质量 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon Glue Studio 笔记本中 ETL 作业的数据质量

在本教程中,您将学习如何使用 Amazon Glue Data Quality 来处理 Amazon Glue Studio 笔记本中的提取、转换、加载(ETL)作业。

您可以在 Amazon Glue Studio 中使用笔记本编辑作业脚本并查看输出,而不必运行完整作业。您还可以添加 Markdown 并将笔记本另存为 .ipynb 文件和作业脚本。请注意:您可以在无需本地安装软件或管理服务器的情况下开启笔记本。当您对自己的代码感到满意时,可以使用 Amazon Glue Studio 轻松地将笔记本转换为 Amazon Glue 作业。

此示例使用的数据集包括从两个 Data.CMS.gov 数据集下载的医疗保健提供商付款数据:"Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" 和 "Inpatient Charge Data FY 2011"。

下载该数据后,我们修改了数据集,以在文件末尾引入了几个错误的记录。这个修改过的文件位于 s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv 上的公有 Amazon S3 存储桶。

先决条件

在 Amazon Glue Studio 中创建 ETL 作业

创建 ETL 作业
  1. 将会话版本更改为 Amazon Glue 3.0。

    为此,请使用以下魔术命令删除所有样板代码单元格,然后运行该单元格。请注意,创建新笔记本时,此样板代码会自动在第一个单元格中提供。

    %glue_version 3.0
  2. 将以下代码复制并粘贴到单元格中。

    import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)
  3. 在下一个单元格中,导入评估 Amazon Glue Data Quality 的 EvaluateDataQuality 类。

    from awsgluedq.transforms import EvaluateDataQuality
  4. 在下一个单元格中,使用存储在公共 Amazon S3 存储桶中的 .csv 文件读入源数据。

    medicare = spark.read.format( "csv").option( "header", "true").option( "inferSchema", "true").load( 's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv') medicare.printSchema()
  5. 将数据转换为 Amazon Glue DynamicFrame。

    from awsglue.dynamicframe import DynamicFrame medicare_dyf = DynamicFrame.fromDF(medicare,glueContext,"medicare_dyf")
  6. 使用数据质量定义语言(DQDL)创建规则集。

    EvaluateDataQuality_ruleset = """ Rules = [ ColumnExists "Provider Id", IsComplete "Provider Id", ColumnValues " Total Discharges " > 15 ] ] """
  7. 根据规则集验证数据集。

    EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows( frame=medicare_dyf, ruleset=EvaluateDataQuality_ruleset, publishing_options={ "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe", "enableDataQualityCloudWatchMetrics": False, "enableDataQualityResultsPublishing": False, }, additional_options={"performanceTuning.caching": "CACHE_NOTHING"}, )
  8. 查看结果。

    ruleOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="ruleOutcomes", transformation_ctx="ruleOutcomes", ) ruleOutcomes.toDF().show(truncate=False)

    输出:

    --------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |Rule |Outcome|FailureReason |EvaluatedMetrics | +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+ |ColumnExists "Provider Id" |Passed |null |{} | |IsComplete "Provider Id" |Passed |null |{Column.Provider Id.Completeness -> 1.0} | |ColumnValues " Total Discharges " > 15|Failed |Value: 11.0 does not meet the constraint requirement!|{Column. Total Discharges .Minimum -> 11.0}| +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
  9. 筛选通过的行并查看数据质量行级结果中的失败行。

    owLevelOutcomes = SelectFromCollection.apply( dfc=EvaluateDataQualityMultiframe, key="rowLevelOutcomes", transformation_ctx="rowLevelOutcomes", ) rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records. rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records

    输出:

    +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |DRG Definition |Provider Id|Provider Name |Provider Street Address |Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|DataQualityRulesPass |DataQualityRulesFail |DataQualityRulesSkip |DataQualityEvaluationResult| +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10005 |MARSHALL MEDICAL CENTER SOUTH |2505 U S HIGHWAY 431 NORTH|BOAZ |AL |35957 |AL - Birmingham |14 |$15131.85 |$5787.57 |$4976.71 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10046 |RIVERVIEW REGIONAL MEDICAL CENTER |600 SOUTH THIRD STREET |GADSDEN |AL |35901 |AL - Birmingham |14 |$67327.92 |$5461.57 |$4493.57 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10083 |SOUTH BALDWIN REGIONAL MEDICAL CENTER|1613 NORTH MCKENZIE STREET|FOLEY |AL |36535 |AL - Mobile |15 |$25411.33 |$5282.93 |$4383.73 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30002 |BANNER GOOD SAMARITAN MEDICAL CENTER |1111 EAST MCDOWELL ROAD |PHOENIX |AZ |85006 |AZ - Phoenix |11 |$34803.81 |$7768.90 |$6951.45 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30010 |CARONDELET ST MARYS HOSPITAL |1601 WEST ST MARY'S ROAD |TUCSON |AZ |85745 |AZ - Tucson |12 |$35968.50 |$6506.50 |$5379.83 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed | +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+ only showing top 5 rows

    请注意,Amazon Glue Data Quality 增加了四个新列(DataQualityRulesPass、DataQualityRulesFail、DataQualityRulesSkip 和 DataQualityEvaluationResult)。这表示已通过的记录、失败的记录、跳过的行级评估规则以及总体行级结果。

  10. 将输出写入 Amazon S3 存储桶,以分析数据并可视化结果。

    #Write the Passed records to the destination. glueContext.write_dynamic_frame.from_options( frame = rowLevelOutcomes_df_passed, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"}, format = "parquet")