开始使用 Amazon RDS 与 Amazon Redshift 的零 ETL 集成 - Amazon Relational Database Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 RDS 数据库和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:

完成这些步骤后,请继续执行创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

提示

您可以在创建集成时让 RDS 为您完成这些设置步骤,而不必手动执行这些步骤。要立即开始创建集成,请参阅创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

步骤 1:创建自定义数据库参数组

Amazon RDS 与 Amazon Redshift 的零 ETL 集成需要为控制二进制日志记录(binlog)的数据库参数提供特定值。要配置二进制日志记录,必须先创建自定义数据库参数组,然后将其与源数据库关联。

使用以下设置创建自定义数据库参数组。有关创建参数组的说明,请参阅Amazon RDS 数据库实例的数据库参数组

  • binlog_format=ROW

  • binlog_row_image=full

此外,请确保 binlog_row_value_options 参数设置为 PARTIAL_JSON

步骤 2:选择或创建源数据库

创建自定义数据库参数组后,选择或创建一个 RDS for MySQL 数据库。该数据库将成为向 Amazon Redshift 复制数据的源。有关创建单可用区或多可用区数据库实例的说明,请参阅创建 Amazon RDS 数据库实例

数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行 Amazon RDS 零 ETL 集成的区域和数据库引擎

有关创建单可用区或多可用区数据库实例的说明,请参阅创建 Amazon RDS 数据库实例有关创建多可用区数据库集群的说明,请参阅创建 Amazon RDS 的多可用区数据库集群

创建数据库时,在其他配置下,将默认的数据库参数组更改为您在上一步中创建的自定义参数组。

注意

如果您在已创建数据库之后 将参数组与数据库关联,则必须重启数据库以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅重启中的数据库实例重启 Amazon RDS 的多可用区数据库集群和读取器数据库实例

此外,请确保对数据库启用了自动备份。有关更多信息,请参阅 启用自动备份

步骤 3:创建目标 Amazon Redshift 数据仓库

创建源数据库后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:

  • 使用 RA3 节点类型以及至少两个节点或 Redshift Serverless。

  • 已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密

有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组

在数据仓库上启用区分大小写

要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。

要启用区分大小写,请根据您的数据仓库类型执行以下步骤:

  • 预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用 enable_case_sensitive_identifier 参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组使用 Amazon CLI 配置参数值

    注意

    将自定义参数组与集群关联后,请记得重启集群。

  • 无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    修改参数值后,无需重启工作组。

为数据仓库配置授权

创建数据仓库后,必须将源 RDS 数据库配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权

使用 Amazon SDK 设置集成

您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用Amazon SDK for Python (Boto3) 创建源 RDS for MySQL 数据库实例和目标 Amazon Redshift 数据仓库,其中每个都具有所需的参数值。然后,它会等待数据库变为可用后,再在它们之间创建零 ETL 集成。您可以根据需要设置的资源注释掉不同的函数。

要安装所需依赖项,请运行以下命令:

pip install boto3 pip install time

在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration 的集成。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()

后续步骤

借助源 RDS 数据库和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅 创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成