开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成 - Amazon Aurora
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成

在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 Aurora 数据库集群和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:

完成这些步骤后,请继续执行创建 Aurora 与 Amazon Redshift 的零 ETL 集成

您可以使用 Amazon SDK 为您自动完成设置过程。有关更多信息,请参阅 使用 Amazon SDK 设置集成

提示

您可以在创建集成时让 RDS 为您完成这些设置步骤,而不必手动执行这些步骤。要立即开始创建集成,请参阅创建 Aurora 与 Amazon Redshift 的零 ETL 集成

步骤 1:创建自定义数据库集群参数组

Aurora 与 Amazon Redshift 的零 ETL 集成需要为控制复制的数据库集群参数提供特定值。具体而言,Aurora MySQL 需要增强型二进制日志aurora_enhanced_binlog),而 Aurora PostgreSQL 需要增强型逻辑复制aurora.enhanced_logical_replication)。

要配置二进制日志记录或逻辑复制,必须先创建自定义数据库集群参数组,然后将其与源数据库集群关联。

Aurora MySQL(aurora-mysql8.0 系列)

  • aurora_enhanced_binlog=1

  • binlog_backup=0

  • binlog_format=ROW

  • binlog_replication_globaldb=0

  • binlog_row_image=full

  • binlog_row_metadata=full

此外,请确保 binlog_transaction_compression 参数设置为 ON,也binlog_row_value_options 参数设置为 PARTIAL_JSON

有关 Aurora MySQL 增强型二进制日志的更多信息,请参阅为 Aurora MySQL 设置增强型二进制日志

Aurora PostgreSQL(aurora-postgresql16 系列):

  • rds.logical_replication=1

  • aurora.enhanced_logical_replication=1

  • aurora.logical_replication_backup=0

  • aurora.logical_replication_globaldb=0

启用增强型逻辑复制(aurora.enhanced_logical_replication)始终会将所有列值都写入预写日志(WAL),即使 REPLICA IDENTITY FULL 未启用也是如此。这可能会增加源数据库集群的 IOPS。

重要

如果您启用或禁用 aurora.enhanced_logical_replication 数据库集群参数,则主数据库实例将使所有逻辑复制槽失效。这将停止从源到目标的复制,并且您必须在主数据库实例上重新创建复制槽。为防止中断,请在复制过程中使参数状态保持一致。

步骤 2:选择或创建源数据库集群

创建自定义数据库集群参数组后,选择或创建一个 Aurora 数据库集群。该集群将成为向 Amazon Redshift 复制数据的源。您可以指定使用预调配数据库实例或 Aurora Serverless v2 数据库实例作为源的数据库集群。有关创建数据库集群的说明,请参阅 创建 Amazon Aurora 数据库集群创建一个使用 Aurora Serverless v2 的数据库集群

数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行零 ETL 集成的区域和 Amazon 数据库引擎

创建数据库时,在其他配置下,将默认的数据库集群参数组更改为您在上一步中创建的自定义参数组。

注意

如果您在已创建集群之后将参数组与数据库集群关联,则必须重启集群中的主数据库实例以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅重启 Amazon Aurora 数据库集群或 Amazon Aurora 数据库实例

步骤 3:创建目标 Amazon Redshift 数据仓库

创建源数据库集群后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:

  • 使用 RA3 节点类型以及至少两个节点或 Redshift Serverless。

  • 已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密

有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组

在数据仓库上启用区分大小写

要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。

要启用区分大小写,请根据您的数据仓库类型执行以下步骤:

  • 预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用 enable_case_sensitive_identifier 参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组使用 Amazon CLI 配置参数值

    注意

    将自定义参数组与集群关联后,请记得重启集群。

  • 无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    修改参数值后,无需重启工作组。

为数据仓库配置授权

创建数据仓库后,必须将源 Aurora 数据库集群配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权

使用 Amazon SDK 设置集成

您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用适用于 Python (Boto3) 的 Amazon SDK 创建源 Amazon Aurora 数据库集群和目标 Amazon Redshift 数据仓库,其中每个都具有所需的参数值。然后,它会等待数据库变为可用后,再在它们之间创建零 ETL 集成。您可以根据需要设置的资源注释掉不同的函数。

要安装所需依赖项,请运行以下命令:

pip install boto3 pip install time

在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration 的集成。

Aurora MySQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL binary logging' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()
Aurora PostgreSQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora PostgreSQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-postgresql16', Description='For Aurora PostgreSQL logical replication' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'rds.logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.enhanced_logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-postgresql', EngineVersion='16.4.aurora-postgresql', DatabaseName='mypostgresdb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-postgresql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora PostgreSQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()

后续步骤

借助源 Aurora 数据库集群和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅创建 Aurora 与 Amazon Redshift 的零 ETL 集成