

# Amazon Glue Studio 笔记本中 ETL 作业的数据质量
<a name="data-quality-gs-studio-notebooks"></a>

在本教程中，您将学习如何使用 Amazon Glue Data Quality 来处理 Amazon Glue Studio 笔记本中的提取、转换、加载（ETL）作业。

您可以在 Amazon Glue Studio 中使用笔记本编辑作业脚本并查看输出，而不必运行完整作业。您还可以添加 Markdown 并将笔记本另存为 `.ipynb` 文件和作业脚本。请注意：您可以在无需本地安装软件或管理服务器的情况下开启笔记本。当您对自己的代码感到满意时，可以使用 Amazon Glue Studio 轻松地将笔记本转换为 Amazon Glue 作业。

此示例使用的数据集包括从两个 *Data.CMS.gov* 数据集下载的医疗保健提供商付款数据："Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011" 和 "Inpatient Charge Data FY 2011"。

下载该数据后，我们修改了数据集，以在文件末尾引入了几个错误的记录。这个修改过的文件位于 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` 上的公有 Amazon S3 存储桶。

## 先决条件
<a name="w2aac49c54c14"></a>
+  具有 Amazon S3 权限的 Amazon Glue 角色可以写入您的目标 Amazon S3 存储桶 
+  一个新的笔记本（请参阅 [Getting started with notebooks in Amazon Glue Studio](https://docs.amazonaws.cn/glue/latest/ug/notebook-getting-started.html)） 

## 在 Amazon Glue Studio 中创建 ETL 作业
<a name="data-quality-notebooks-example"></a>

**创建 ETL 作业**

1.  将会话版本更改为 Amazon Glue 3.0。

    为此，请使用以下魔术命令删除所有样板代码单元格，然后运行该单元格。请注意，创建新笔记本时，此样板代码会自动在第一个单元格中提供。

   ```
   %glue_version 3.0
   ```

1.  将以下代码复制并粘贴到单元格中。

   ```
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from pyspark.context import SparkContext
   from awsglue.context import GlueContext
   from awsglue.job import Job
   
   sc = SparkContext.getOrCreate()
   glueContext = GlueContext(sc)
   spark = glueContext.spark_session
   job = Job(glueContext)
   ```

1.  在下一个单元格中，导入评估 Amazon Glue Data Quality 的 `EvaluateDataQuality` 类。

   ```
   from awsgluedq.transforms import EvaluateDataQuality
   ```

1. 在下一个单元格中，使用存储在公共 Amazon S3 存储桶中的 .csv 文件读入源数据。

   ```
   medicare = spark.read.format(
   "csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
   medicare.printSchema()
   ```

1.  将数据转换为 Amazon Glue DynamicFrame。

   ```
   from awsglue.dynamicframe import DynamicFrame
   medicare_dyf = DynamicFrame.fromDF(medicare,glueContext,"medicare_dyf")
   ```

1.  使用数据质量定义语言（DQDL）创建规则集。

   ```
   EvaluateDataQuality_ruleset = """
       Rules = [
           ColumnExists "Provider Id",
           IsComplete "Provider Id",
           ColumnValues  " Total Discharges " > 15
   ]
       ]
   """
   ```

1.  根据规则集验证数据集。

   ```
   EvaluateDataQualityMultiframe = EvaluateDataQuality().process_rows(
       frame=medicare_dyf,
       ruleset=EvaluateDataQuality_ruleset,
       publishing_options={
           "dataQualityEvaluationContext": "EvaluateDataQualityMultiframe",
           "enableDataQualityCloudWatchMetrics": False,
           "enableDataQualityResultsPublishing": False,
       },
       additional_options={"performanceTuning.caching": "CACHE_NOTHING"},
   )
   ```

1.  查看结果。

   ```
   ruleOutcomes = SelectFromCollection.apply(
       dfc=EvaluateDataQualityMultiframe,
       key="ruleOutcomes",
       transformation_ctx="ruleOutcomes",
   )
   
   ruleOutcomes.toDF().show(truncate=False)
   ```

    输出：

   ```
   --------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
   |Rule                                  |Outcome|FailureReason                                        |EvaluatedMetrics                           |
   +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
   |ColumnExists "Provider Id"            |Passed |null                                                 |{}                                         |
   |IsComplete "Provider Id"              |Passed |null                                                 |{Column.Provider Id.Completeness -> 1.0}   |
   |ColumnValues " Total Discharges " > 15|Failed |Value: 11.0 does not meet the constraint requirement!|{Column. Total Discharges .Minimum -> 11.0}|
   +--------------------------------------+-------+-----------------------------------------------------+-------------------------------------------+
   ```

1.  筛选通过的行并查看数据质量行级结果中的失败行。

   ```
   owLevelOutcomes = SelectFromCollection.apply(
   dfc=EvaluateDataQualityMultiframe,
   key="rowLevelOutcomes",
   transformation_ctx="rowLevelOutcomes",
   )
   
   rowLevelOutcomes_df = rowLevelOutcomes.toDF() # Convert Glue DynamicFrame to SparkSQL DataFrame
   rowLevelOutcomes_df_passed = rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Passed") # Filter only the Passed records.
   rowLevelOutcomes_df.filter(rowLevelOutcomes_df.DataQualityEvaluationResult == "Failed").show(5, truncate=False) # Review the Failed records
   ```

    输出：

   ```
   +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+
   |DRG Definition                          |Provider Id|Provider Name                        |Provider Street Address   |Provider City|Provider State|Provider Zip Code|Hospital Referral Region Description| Total Discharges | Average Covered Charges | Average Total Payments |Average Medicare Payments|DataQualityRulesPass      |DataQualityRulesFail                    |DataQualityRulesSkip        |DataQualityEvaluationResult|
   +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+
   |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10005      |MARSHALL MEDICAL CENTER SOUTH        |2505 U S HIGHWAY 431 NORTH|BOAZ         |AL            |35957            |AL - Birmingham                     |14                |$15131.85                |$5787.57                |$4976.71                 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed                     |
   |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10046      |RIVERVIEW REGIONAL MEDICAL CENTER    |600 SOUTH THIRD STREET    |GADSDEN      |AL            |35901            |AL - Birmingham                     |14                |$67327.92                |$5461.57                |$4493.57                 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed                     |
   |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|10083      |SOUTH BALDWIN REGIONAL MEDICAL CENTER|1613 NORTH MCKENZIE STREET|FOLEY        |AL            |36535            |AL - Mobile                         |15                |$25411.33                |$5282.93                |$4383.73                 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed                     |
   |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30002      |BANNER GOOD SAMARITAN MEDICAL CENTER |1111 EAST MCDOWELL ROAD   |PHOENIX      |AZ            |85006            |AZ - Phoenix                        |11                |$34803.81                |$7768.90                |$6951.45                 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed                     |
   |039 - EXTRACRANIAL PROCEDURES W/O CC/MCC|30010      |CARONDELET ST  MARYS HOSPITAL        |1601 WEST ST MARY'S ROAD  |TUCSON       |AZ            |85745            |AZ - Tucson                         |12                |$35968.50                |$6506.50                |$5379.83                 |[IsComplete "Provider Id"]|[ColumnValues " Total Discharges " > 15]|[ColumnExists "Provider Id"]|Failed                     |
   +----------------------------------------+-----------+-------------------------------------+--------------------------+-------------+--------------+-----------------+------------------------------------+------------------+-------------------------+------------------------+-------------------------+--------------------------+----------------------------------------+----------------------------+---------------------------+
   only showing top 5 rows
   ```

    请注意，Amazon Glue Data Quality 增加了四个新列（DataQualityRulesPass、DataQualityRulesFail、DataQualityRulesSkip 和 DataQualityEvaluationResult）。这表示已通过的记录、失败的记录、跳过的行级评估规则以及总体行级结果。

1.  将输出写入 Amazon S3 存储桶，以分析数据并可视化结果。

   ```
   #Write the Passed records to the destination. 
   
   glueContext.write_dynamic_frame.from_options(
          frame = rowLevelOutcomes_df_passed,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
          format = "parquet")
   ```