Using APIs to measure and manage data quality - Amazon Glue
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using APIs to measure and manage data quality

This topic describes how to use APIs to measure and manage data quality.

Prerequisites

  • Make sure your boto3 version is up to date so that it includes the latest Amazon Glue Data Quality API.

  • Make sure your Amazon CLI version is up to date, so as to include the latest CLI.

If you’re using an Amazon Glue job to run these APIs, you can use the following option to update the boto3 library to the latest version:

—additional-python-modules boto3==<version>

Working with Amazon Glue Data Quality recommendations

To start an Amazon Glue Data Quality recommendation run:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn): """ Starts a recommendation run that is used to generate rules when you don't know what rules to write. Amazon Glue Data Quality analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset and modify the generated ruleset to your liking. :param database_name: The name of the Amazon Glue database which contains the dataset. :param table_name: The name of the Amazon Glue table against which we want a recommendation :param role_arn: The Amazon Resource Name (ARN) of an Amazon Identity and Access Management (IAM) role that grants permission to let Amazon Glue access the resources it needs. """ try: response = self.client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_arn ) except ClientError as err: logger.error( "Couldn't start data quality recommendation run %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

For a recommendation run, you are able to use your pushDownPredicates or catalogPartitionPredicates to improve performance and run recommendations only on specific partitions of your catalog sources.

client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name, 'AdditionalOptions': { 'pushDownPredicate': "year=2022" } } }, Role=role_arn, NumberOfWorkers=2, CreatedRulesetName='<rule_set_name>' )

To get results of an Amazon Glue Data Quality recommendation run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_rule_recommendation_run(self, run_id): """ Gets the specified recommendation run that was used to generate rules. :param run_id: The id of the data quality recommendation run """ try: response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id) except ClientError as err: logger.error( "Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

From the above response object, you can extract the RuleSet that was recommended by the run, to use in further steps:

print(response['RecommendedRuleset']) Rules = [ RowCount between 2000 and 8000, IsComplete "col1", IsComplete "col2", StandardDeviation "col3" between 58138330.8 and 64258155.09, ColumnValues "col4" between 1000042965 and 1214474826, IsComplete "col5" ]

To get a list of all your recommendation runs that can be filtered and listed:

response = client.list_data_quality_rule_recommendation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>' } } )

To cancel existing Amazon Glue Data Quality recommendation tasks:

response = client.cancel_data_quality_rule_recommendation_run( RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' )

Working with Amazon Glue Data Quality rulesets

To create an Amazon Glue Data Quality ruleset:

response = client.create_data_quality_ruleset( Name='<ruleset_name>', Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]', TargetTable={ 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } )

To get a data quality ruleset:

response = client.get_data_quality_ruleset( Name='<ruleset_name>' ) print(response)

You can use this API to then extract the rule set:

print(response['Ruleset'])

To list all the data quality rulesets for a table:

response = client.list_data_quality_rulesets()

You can use the filter condition within the API to filter all rulesets attached to a specific database or table:

response = client.list_data_quality_rulesets( Filter={ 'TargetTable': { 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } }, )

To update a data quality ruleset:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def update_data_quality_ruleset(self, ruleset_name, ruleset_string): """ Update an Amazon Glue Data Quality Ruleset :param ruleset_name: The name of the Amazon Glue Data Quality ruleset to update :param ruleset_string: The DQDL ruleset string to update the ruleset with """ try: response = self.client.update_data_quality_ruleset( Name=ruleset_name, Ruleset=ruleset_string ) except ClientError as err: logger.error( "Couldn't update the Amazon Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To delete a data quality ruleset:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def delete_data_quality_ruleset(self, ruleset_name): """ Delete a Amazon Glue Data Quality Ruleset :param ruleset_name: The name of the Amazon Glue Data Quality ruleset to delete """ try: response = self.client.delete_data_quality_ruleset( Name=ruleset_name ) except ClientError as err: logger.error( "Couldn't delete the Amazon Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Working with Amazon Glue Data Quality runs

To start an Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list): """ Start an Amazon Glue Data Quality evaluation run :param database_name: The name of the Amazon Glue database which contains the dataset. :param table_name: The name of the Amazon Glue table against which we want to evaluate. :param role_arn: The Amazon Resource Name (ARN) of an Amazon Identity and Access Management (IAM) role that grants permission to let Amazon Glue access the resources it needs. :param ruleset_list: The list of Amazon Glue Data Quality ruleset names to evaluate. """ try: response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_name, RulesetNames=ruleset_list ) except ClientError as err: logger.error( "Couldn't start the Amazon Glue Data Quality Run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

Remember that you can pass a pushDownPredicate or catalogPartitionPredicate parameter to ensure your data quality run only targets a specific set of partition within your catalog table. For example:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CloudWatchMetricsEnabled': False }, RulesetNames=[ '<ruleset_name>', ] )

To get information about an Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_ruleset_evaluation_run(self, run_id): """ Get details about an Amazon Glue Data Quality Run :param run_id: The Amazon Glue Data Quality run ID to look up """ try: response = self.client.get_data_quality_ruleset_evaluation_run( RunId=run_id ) except ClientError as err: logger.error( "Couldn't look up the Amazon Glue Data Quality run ID. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To get the results from an Amazon Glue Data Quality run:

For a given Amazon Glue Data Quality run, you can extract the results of the run's evaluation using the following method:

response = client.get_data_quality_ruleset_evaluation_run( RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(response['RuleResults'])

To list all your Amazon Glue Data Quality runs:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name): """ Lists all the Amazon Glue Data Quality runs against a given table :param database_name: The name of the database where the data quality runs :param table_name: The name of the table against which the data quality runs were created """ try: response = self.client.list_data_quality_ruleset_evaluation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } } } ) except ClientError as err: logger.error( "Couldn't list the Amazon Glue Quality runs. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

You can modify the filter clause to only show results between specific times or running against specific tables.

To stop an ongoing Amazon Glue Data Quality run:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def cancel_data_quality_ruleset_evaluation_run(self, result_id): """ Cancels a given Amazon Glue Data Quality run :param result_id: The result id of a Amazon Glue Data Quality run to cancel """ try: response = self.client.cancel_data_quality_ruleset_evaluation_run( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't cancel the Amazon Glue Data Quality run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Working with Amazon Glue Data Quality results

To get your Amazon Glue Data Quality run results:

class GlueWrapper: """Encapsulates Amazon Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Amazon Glue client. """ self.glue_client = glue_client def get_data_quality_result(self, result_id): """ Outputs the result of an Amazon Glue Data Quality Result :param result_id: The result id of an Amazon Glue Data Quality run """ try: response = self.client.get_data_quality_result( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't get the Amazon Glue Data Quality result. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

To cancel existing Amazon Glue Data Quality recommendation tasks:

Given an Amazon Glue Data Quality run ID, you can extract the result ID to then get the actual results, as shown below:

response = client.get_data_quality_ruleset_evaluation_run( RunId='dqrun-abca77ee126abe1378c1da1ae0750xxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(resp['RuleResults'])