设置警报、部署和计划 - Amazon Glue
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

设置警报、部署和计划

本主题介绍如何为 Amazon Glue 数据质量设置警报、部署和计划。

在 Amazon EventBridge 集成中设置提醒和通知

Amazon Glue Data Quality 支持发布 EventBridge 事件,这些事件是在数据质量规则集评估运行完成后发出的。这样,当数据质量规则失效时,您可以轻松设置警报。

以下是您在 Data Catalog 中评估数据质量规则集时的示例事件。有了这些信息,您就可以查看亚马逊提供的数据 EventBridge。您可以发出其他 API 调用以获取更多详细信息。例如,使用结果 ID 调用 get_data_quality_result API 以获取特定执行的详细信息。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

以下是您在评估 G Amazon lue ETL 或 Glue Studio 笔记本中的数据质量规则集时发布的示例事件 Amazon 。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

要使数据质量评估同时在数据目录和 ETL 作业中运行,则必须保持选中状态的 Amazon CloudWatch“将指标发布到” 选项(默认情况下处于选中状态)才能 EventBridge 发布。

设置 EventBridge 通知

中的数据质量属性 Amazon CloudFormation

要接收发出的事件并定义目标,您必须配置 Amazon EventBridge 规则。要创建规则,请执行以下操作:

  1. 打开 Amazon EventBridge 控制台。

  2. 在导航栏的总线部分下选择规则

  3. 选择 Create Rule

  4. 定义规则详细信息中:

    1. 对于名称,请输入 myDQRule

    2. 输入描述(可选)。

    3. 对于事件总线,请选择您的事件总线。如果您没有,请将其保留为默认值。

    4. 对于“规则类型”,选择具有事件模式的规则,然后选择下一步

  5. 构建事件模式中:

    1. 对于事件源,请选择Amazon 事件或 EventBridge 合作伙伴事件

    2. 跳过示例事件部分。

    3. 对于创建方法,选择使用模式表单

    4. 对于事件模式:

      1. 为事件源选择 Amazon 服务

      2. 为 Amazon 服务选择 Glue 数据质量

      3. 为“事件类型”选择可用的数据质量评估结果

      4. 为“特定状态”选择失败。然后,您将看到类似于以下内容的事件模式:

        { "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
      5. 有关更多连接选项,请参阅 事件模式的其他配置选项

  6. 选择目标上:

    1. 对于目标类型,选择 Amazon 服务

    2. 使用选择目标下拉列表选择要连接的所需 Amazon 服务(SNS、Lambda、SQS 等),然后选择下一步。

  7. 配置标签上,单击添加新标签以添加可选标签,然后选择下一步

  8. 您会看到所有选择的摘要页面。选择底部的创建规则

事件模式的其他配置选项

除了根据成功或失败筛选事件外,您可能还需要根据不同的参数进一步筛选事件。

为此,请转到“事件模式”部分,然后选择编辑模式以指定其他参数。请注意,事件模式中的字段区分大小写。以下是配置事件模式的示例。

要从评估特定规则集的特定表中捕获事件,请使用以下类型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }

要在 ETL 体验中捕获来自特定作业的事件,请使用以下类型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }

要捕获分数低于特定阈值(例如 70%)的事件,请执行以下操作:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }

将通知格式化为电子邮件

有时,您需要向业务团队发送格式完善的电子邮件通知。您可以使用 Amazon EventBridge 和 Amazon Lambda 来实现这一目标。

格式化为电子邮件的数据质量通知

以下示例代码可用于格式化数据质量通知以生成电子邮件。

import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

在 CloudWatch 集成中设置警报和通知

我们推荐的方法是使用 Amazon 设置数据质量提醒 EventBridge,因为亚马逊 EventBridge 需要一次性设置才能提醒买家。但是, CloudWatch 由于熟悉亚马逊,有些买家更喜欢亚马逊。对于此类客户,我们提供与 Amazon 的集成 CloudWatch。

每 Amazon 次 Glue 数据质量评估都会在每次数据质量运行时发出一对名为glue.data.quality.rules.passed(表示通过的规则数量)和glue.data.quality.rules.failed(表示失败的规则数)的指标。您可以使用此发出的指标创建警报,以便在给定的数据质量运行低于阈值时提醒用户。要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:

要开始设置可通过 Amazon SNS 通知发送电子邮件的警报,请按照以下步骤操作:

  1. 打开 Amazon CloudWatch 控制台。

  2. 指标下选择所有指标。您将在自定义命名空间下看到一个名为 Glue Data Quality 的额外命名空间。

    注意

    开始运行 Glu Amazon e 数据质量时,请确保已启用 “将指标发布到亚马逊 CloudWatch” 复选框。否则,该特定运行的指标将不会发布到 Amazon CloudWatch。

    Glue Data Quality 命名空间下,您可以看到每个表、每个规则集发出的指标。对于本主题,如果该值超过 1,我们将使用 glue.data.quality.rules.failed 规则和警报(表示,如果我们看到许多失败的规则评估大于 1,我们希望收到通知)。

  3. 要创建警报,请在警报下选择所有警报

  4. 选择创建警报

  5. 选择选择指标

  6. 选择与您创建的表格相对应的 glue.data.quality.rules.failed 指标,然后选择选择指标

  7. 指定指标和条件选项卡下的指标部分下:

    1. 对于 Statistic(统计数据),选择 Sum(总计)

    2. 对于周期,选择 1 分钟

  8. 条件部分下:

    1. 对于阈值类型,选择静态

    2. 对于每当 glue.data.quality.rules.failed 为...,选择大于/等于

    3. 对于不止于...,输入 1 作为阈值。

    这些选择意味着,如果 glue.data.quality.rules.failed 指标发出的值大于或等于 1,我们将触发警报。但是,如果没有数据,我们会将其视为可接受。

  9. 选择下一步

  10. 配置操作中:

    1. 对于警报状态触发器,选择报警中

    2. 对于向以下 SNS 主题发送通知部分,选择创建新主题以通过新的 SNS 主题发送通知

    3. 对于将接收通知的电子邮件端点,请输入您的电子邮件地址。然后单击创建主题

    4. 选择下一步

  11. 对于警报名称,输入 myFirstDQAlarm,然后选择下一步

  12. 您会看到所有选择的摘要页面。选择底部的创建警报

现在,您可以从 Amazon 警报控制面板中看到正在创建的 CloudWatch 警报。

查询数据质量结果以构建控制面板

您可能需要构建一个控制面板来显示您的数据质量结果。有两种方式可执行此操作:

EventBridge 使用以下代码设置亚马逊,将数据写入 Amazon S3:

import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

写入 Amazon S3 后,您可以使用 Amazon Glue 抓取工具注册到 Athena 并查询表。

在数据质量评估期间配置 Amazon S3 位置:

在 Amazon Glue 数据目录或 Amazon Glue ETL 中运行数据质量任务时,您可以提供一个 Amazon S3 位置,用于将数据质量结果写入亚马逊 S3。您可以使用以下语法通过引用目标来创建表,以读取数据质量结果。

请注意:必须分别运行 CREATE EXTERNAL TABLEMSCK REPAIR TABLE 查询。

CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;

创建上表后,即可使用 Amazon Athena 运行分析查询。

使用部署数据质量规则 Amazon CloudFormation

您可以使用 Amazon CloudFormation 来创建数据质量规则。有关更多信息,Amazon CloudFormation 请参阅 Amazon Glue

计划数据质量规则

您可以使用以下方法计划数据质量规则:

  • 从数据目录中安排数据质量规则:任何代码用户都无法使用此选项轻松安排数据质量扫描。 Amazon Glue Data Quality 将在亚马逊上创建时间表 EventBridge。要计划数据质量规则,请执行以下操作:

    • 导航到规则集并单击运行

    • 运行频率中,选择所需的计划并提供任务名称。此任务名称是您在中的日程安排的名称 EventBridge。

  • 使用 Amazon EventBridge 和 Amazon Step Functions 来协调数据质量规则的评估和建议。