开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成 - Amazon Aurora
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成

在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 Aurora 数据库集群和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:

完成这些步骤后,请继续执行创建 Aurora 与 Amazon Redshift 的零 ETL 集成

您可以使用 Amazon SDK 为您自动完成设置过程。有关更多信息,请参阅 使用 Amazon SDK 设置集成(仅限 Aurora MySQL)

步骤 1:创建自定义数据库集群参数组

Aurora 与 Amazon Redshift 的零 ETL 集成需要为控制复制的数据库集群参数提供特定值。具体而言,Aurora MySQL 需要增强型二进制日志aurora_enhanced_binlog),而 Aurora PostgreSQL 需要增强型逻辑复制aurora.enhanced_logical_replication)。

要配置二进制日志记录或逻辑复制,必须先创建自定义数据库集群参数组,然后将其与源数据库集群关联。

根据您的源数据库引擎,使用以下设置创建自定义数据库集群参数组。有关创建参数组的说明,请参阅使用数据库集群参数组

Aurora MySQL(aurora-mysql8.0 系列)

  • aurora_enhanced_binlog=1

  • binlog_backup=0

  • binlog_format=ROW

  • binlog_replication_globaldb=0

  • binlog_row_image=full

  • binlog_row_metadata=full

此外,请确保 binlog_transaction_compression 参数设置为 ON,也binlog_row_value_options 参数设置为 PARTIAL_JSON

有关 Aurora MySQL 增强型二进制日志的更多信息,请参阅设置增强型二进制日志

Aurora PostgreSQL(aurora-postgresql15 系列):

注意

对于 Aurora PostgreSQL 数据库集群,您必须在美国东部(俄亥俄州)(us-east-2)Amazon Web Services 区域的 Amazon RDS 数据库预览环境中创建自定义参数组。

  • rds.logical_replication=1

  • aurora.enhanced_logical_replication=1

  • aurora.logical_replication_backup=0

  • aurora.logical_replication_globaldb=0

启用增强型逻辑复制(aurora.enhanced_logical_replication)会自动将 REPLICA IDENTITY 参数设置为 FULL,这意味着所有列值都写入提前写入日志(WAL)。这将增加源数据库集群的 IOPS。

步骤 2:选择或创建源数据库集群

创建自定义数据库集群参数组后,选择或创建一个 Aurora MySQL 或 Aurora PostgreSQL 数据库集群。该集群将成为向 Amazon Redshift 复制数据的源。

集群必须运行 Aurora MySQL 版本 3.05(与 MySQL 8.0.32 兼容)或更高版本,或者 Aurora PostgreSQL(兼容 PostgreSQL 15.4 和零 ETL 支持)。有关创建数据库集群的说明,请参阅。

注意

您必须在美国东部(俄亥俄州)(us-east-2)Amazon Web Services 区域的 Amazon RDS 数据库预览环境中创建 Aurora PostgreSQL 数据库集群。

其它配置下,将默认的数据库集群参数组更改为您在上一步中创建的自定义参数组。

注意

对于 Aurora MySQL,您在已创建集群之后 将参数组与数据库集群关联,则必须重启集群中的主数据库实例以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅 重启 Amazon Aurora 数据库集群或 Amazon Aurora 数据库实例

在 Aurora PostgreSQL 与 Amazon Redshift 的零 ETL 集成预览版期间,您必须在创建集群时将集群与自定义数据库集群参数组相关联。在源数据库集群已经创建后,您不能执行此操作,否则您需要删除并重新创建集群。

步骤 3:创建目标 Amazon Redshift 数据仓库

创建源数据库集群后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:

  • 预览版中创建(仅适用于 Aurora PostgreSQL 源)。对于 Aurora MySQL 源,您必须创建生产集群和工作组。

    • 要在预览版中创建预调配集群,请从预调配集群控制面板上的横幅中选择创建预览版集群。有关更多信息,请参阅创建预览版集群

      创建集群时,将预览版跟踪设置为 preview_2023

    • 要在预览版中创建 Redshift Serverless 工作组,请从无服务器控制面板的横幅中选择创建预览版工作组。有关更多信息,请参阅创建预览版工作组

  • 使用 RA3 节点类型(ra3.xlplusra3.4xlargera3.16xlarge)或 Redshift Serverless。

  • 已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密

有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组

在数据仓库上启用区分大小写

要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。

要启用区分大小写,请根据您的数据仓库类型执行以下步骤:

  • 预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用 enable_case_sensitive_identifier 参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组使用 Amazon CLI 配置参数值

    注意

    将自定义参数组与集群关联后,请记得重启集群。

  • 无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    修改参数值后,无需重启工作组。

为数据仓库配置授权

创建数据仓库后,必须将源 Aurora 数据库集群配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权

使用 Amazon SDK 设置集成(仅限 Aurora MySQL)

您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。代码示例使用 Amazon SDK for Python (Boto3) 创建源 Aurora MySQL 数据库集群和目标 Amazon Redshift 数据仓库,其中每个都具有所需的参数值。然后,它会等待集群可用后,再在它们之间创建零 ETL 集成。您可以根据需要设置的资源注释掉不同的函数。

要安装所需依赖项,请运行以下命令:

pip install boto3 pip install time

在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration 的集成。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name, ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance', ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name, ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()

后续步骤

借助源 Aurora 数据库集群和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅 创建 Aurora 与 Amazon Redshift 的零 ETL 集成