Using APIs to measure and manage data quality - Amazon Glue
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using APIs to measure and manage data quality

This topic describes how to use APIs to measure and manage data quality.

Prerequisites

  • Make sure your boto3 version is up to date so that it includes the latest Amazon Glue Data Quality API.

  • Make sure your Amazon CLI version is up to date, so as to include the latest CLI.

If you’re using an Amazon Glue job to run these APIs, you can use the following option to update the boto3 library to the latest version:

—additional-python-modules boto3==<version>

Working with Amazon Glue Data Quality recommendations

To start an Amazon Glue Data Quality recommendation run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn): """ Starts a recommendation run that is used to generate rules when you don't know what rules to write. Amazon Glue Data Quality analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset and modify the generated ruleset to your liking. :param database_name: The name of the Amazon Glue database which contains the dataset. :param table_name: The name of the Amazon Glue table against which we want a recommendation :param role_arn: The Amazon Resource Name (ARN) of an Amazon Identity and Access Management (IAM) role that grants permission to let Amazon Glue access the resources it needs. """ try: response = self.client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_arn ) except ClientError as err: logger.error( "Couldn't start data quality recommendation run %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

For a recommendation run, you are able to use your pushDownPredicates or catalogPartitionPredicates to improve performance and run recommendations only on specific partitions of your catalog sources.

client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name, 'AdditionalOptions': { 'pushDownPredicate': "year=2022" } } }, Role=role_arn, NumberOfWorkers=2, CreatedRulesetName='<rule_set_name>' )

To get results of an Amazon Glue Data Quality recommendation run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_rule_recommendation_run(self, run_id): """ Gets the specified recommendation run that was used to generate rules. :param run_id: The id of the data quality recommendation run """ try: response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id) except ClientError as err: logger.error( "Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

From the above response object, you can extract the RuleSet that was recommended by the run, to use in further steps:

print(response['RecommendedRuleset']) Rules = [ RowCount between 2000 and 8000, IsComplete "col1", IsComplete "col2", StandardDeviation "col3" between 58138330.8 and 64258155.09, ColumnValues "col4" between 1000042965 and 1214474826, IsComplete "col5" ]

To get a list of all your recommendation runs that can be filtered and listed:

response = client.list_data_quality_rule_recommendation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>' } } )

To cancel existing Amazon Glue Data Quality recommendation tasks:

response = client.cancel_data_quality_rule_recommendation_run( RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' )

Working with Amazon Glue Data Quality rulesets

To create an Amazon Glue Data Quality ruleset:

response = client.create_data_quality_ruleset( Name='<ruleset_name>', Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]', TargetTable={ 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } )

To get a data quality ruleset:

response = client.get_data_quality_ruleset( Name='<ruleset_name>' ) print(response)

You can use this API to then extract the rule set:

print(response['Ruleset'])

To list all the data quality rulesets for a table:

response = client.list_data_quality_rulesets()

You can use the filter condition within the API to filter all rulesets attached to a specific database or table:

response = client.list_data_quality_rulesets( Filter={ 'TargetTable': { 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } }, )

To update a data quality ruleset:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def update_data_quality_ruleset(self, ruleset_name, ruleset_string): """ Update an Amazon Glue Data Quality Ruleset :param ruleset_name: The name of the Amazon Glue Data Quality ruleset to update :param ruleset_string: The DQDL ruleset string to update the ruleset with """ try: response = self.client.update_data_quality_ruleset( Name=ruleset_name, Ruleset=ruleset_string ) except ClientError as err: logger.error( "Couldn't update the Amazon Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To delete a data quality ruleset:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def delete_data_quality_ruleset(self, ruleset_name): """ Delete a Amazon Glue Data Quality Ruleset :param ruleset_name: The name of the Amazon Glue Data Quality ruleset to delete """ try: response = self.client.delete_data_quality_ruleset( Name=ruleset_name ) except ClientError as err: logger.error( "Couldn't delete the Amazon Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Working with Amazon Glue Data Quality runs

To start an Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list): """ Start an Amazon Glue Data Quality evaluation run :param database_name: The name of the Amazon Glue database which contains the dataset. :param table_name: The name of the Amazon Glue table against which we want to evaluate. :param role_arn: The Amazon Resource Name (ARN) of an Amazon Identity and Access Management (IAM) role that grants permission to let Amazon Glue access the resources it needs. :param ruleset_list: The list of Amazon Glue Data Quality ruleset names to evaluate. """ try: response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_name, RulesetNames=ruleset_list ) except ClientError as err: logger.error( "Couldn't start the Amazon Glue Data Quality Run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

Remember that you can pass a pushDownPredicate or catalogPartitionPredicate parameter to ensure your data quality run only targets a specific set of partition within your catalog table. For example:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CloudWatchMetricsEnabled': False }, RulesetNames=[ '<ruleset_name>', ] )

You can also configure how composite rules in your ruleset are evaluated, either at the ROW or COLUMN level. For more information on how composite rules work, please refer to How composite rules work in the documentation.

Example on how to set the composite rule evaluation method in your request:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CompositeRuleEvaluationMethod':ROW }, RulesetNames=[ '<ruleset_name>', ] )

To get information about an Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_ruleset_evaluation_run(self, run_id): """ Get details about an Amazon Glue Data Quality Run :param run_id: The Amazon Glue Data Quality run ID to look up """ try: response = self.client.get_data_quality_ruleset_evaluation_run( RunId=run_id ) except ClientError as err: logger.error( "Couldn't look up the Amazon Glue Data Quality run ID. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To get the results from an Amazon Glue Data Quality run:

For a given Amazon Glue Data Quality run, you can extract the results of the run's evaluation using the following method:

response = client.get_data_quality_ruleset_evaluation_run( RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(response['RuleResults'])

To list all your Amazon Glue Data Quality runs:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name): """ Lists all the Amazon Glue Data Quality runs against a given table :param database_name: The name of the database where the data quality runs :param table_name: The name of the table against which the data quality runs were created """ try: response = self.client.list_data_quality_ruleset_evaluation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } } } ) except ClientError as err: logger.error( "Couldn't list the Amazon Glue Quality runs. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

You can modify the filter clause to only show results between specific times or running against specific tables.

To stop an ongoing Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def cancel_data_quality_ruleset_evaluation_run(self, result_id): """ Cancels a given Amazon Glue Data Quality run :param result_id: The result id of a Amazon Glue Data Quality run to cancel """ try: response = self.client.cancel_data_quality_ruleset_evaluation_run( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't cancel the Amazon Glue Data Quality run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Working with Amazon Glue Data Quality results

To get your Amazon Glue Data Quality run results:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_result(self, result_id): """ Outputs the result of an Amazon Glue Data Quality Result :param result_id: The result id of an Amazon Glue Data Quality run """ try: response = self.client.get_data_quality_result( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't get the Amazon Glue Data Quality result. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To view the statistics gathered for a given data quality result:

import boto3 from botocore.exceptions import ClientError import logging logger = logging.getLogger(__name__) class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_profile_for_data_quality_result(self, result_id): """ Outputs the statistic profile for a Amazon Glue Data Quality Result :param result_id: The result id of a Amazon Glue Data Quality run """ try: response = self.glue_client.get_data_quality_result( ResultId=result_id ) # the profile contains all statistics gathered for the result profile_id = response['ProfileId'] profile = self.glue_client.list_data_quality_statistics( ProfileId = profile_id ) return profile except ClientError as err: logger.error( "Couldn't retrieve Data Quality profile. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To view the timeseries for a statistic gathered across multiple data quality runs:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_statistics_for_data_quality_result(self, profile_id): """ Outputs an array of datapoints for each statistic in the input result. :param result_id: The profile id of a Amazon Glue Data Quality run """ try: profile = self.glue_client.list_data_quality_statistics( ProfileId = profile_id ) statistics = [self.glue_client.list_data_quality_statistics( StatisticId = s['StatisticId'] ) for s in profile['Statistics']] return statistics except ClientError as err: logger.error( "Couldn't retrieve Data Quality statistics. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To view the anomaly detection model for a specific statistic:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_model_training_result_for_statistic(self, statistic_id, profile_id): """ Outputs the details (bounds) of anomaly detection training for the given statistic at the given profile. :param statistic_id the model's statistic (the timeseries it is tracking) :param profile_id the profile associated with the model (a point in the timeseries) """ try: model = self.glue_client.get_data_quality_model_result( ProfileId = profile_id, StatisticId = statistic_id ) return model except ClientError as err: logger.error( "Couldn't retrieve Data Quality model results. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To exclude a datapoint from its statistic model's anomaly detection baseline:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def apply_exclusions_to_statistic(self, statistic_id, profile_ids): """ Annotate some points along a given statistic timeseries. This example excludes the provided values; INCLUDE can also be used to undo this action. :param statistic_id the statistic timeseries to annotate :param profile_id the profiles we want to exclude (points in the timeseries) """ try: response = self.glue_client.batch_put_data_quality_statistic_annotation( InclusionAnnotations = [ {'ProfileId': prof_id, 'StatisticId': statistic_id, 'InclusionAnnotation': 'EXCLUDE'} for prof_id in profile_ids ] ) return response['FailedInclusionAnnotations'] except ClientError as err: logger.error( "Couldn't store Data Quality annotations. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To view the status of anomaly detection model training for a specific statistic:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_model_training_status_for_statistic(self, statistic_id, profile_id): """ Outputs the status of anomaly detection training for the given statistic at the given profile. :param statistic_id the model's statistic (the timeseries it is tracking) :param profile_id the profile associated with the model (a point in the timeseries) """ try: model = self.glue_client.get_data_quality_model( ProfileId = profile_id, StatisticId = statistic_id ) return model except ClientError as err: logger.error( "Couldn't retrieve Data Quality statistics. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To exclude all results from a specific data quality run from anomaly detection baselines:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def apply_exclusions_to_profile(self, profile_id): """ Exclude datapoints produced by a run across statistic timeseries. This example excludes the provided values; INCLUDE can also be used to undo this action. :param profile_id the profiles we want to exclude (points in the timeseries) """ try: response = self.glue_client.put_data_quality_profile_annotation( ProfileId = profile_id, InclusionAnnotation = "EXCLUDE" ) return response except ClientError as err: logger.error( "Couldn't store Data Quality annotations. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise

To get the results from a given data quality run and display the results:

With a Amazon Glue Data Quality runID, you can extract the resultID to then get the actual results, as shown below:

response = client.get_data_quality_ruleset_evaluation_run( RunId='dqrun-abca77ee126abe1378c1da1ae0750d7dxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(resp['RuleResults'])