开始使用 Amazon RDS 与 Amazon Redshift 的零 ETL 集成 - Amazon Relational Database Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

开始使用 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 RDS 数据库和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:

完成这些步骤后,请继续执行创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

提示

您可以在创建集成时让 RDS 为您完成这些设置步骤,而不必手动执行这些步骤。要立即开始创建集成,请参阅创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成

步骤 1:创建自定义数据库参数组

Amazon RDS 与 Amazon Redshift 的零 ETL 集成需要为控制二进制日志记录(binlog)的数据库参数提供特定值。要配置二进制日志记录,必须先创建自定义数据库参数组,然后将其与源数据库关联。配置以下参数值。有关创建参数组的说明,请参阅Amazon RDS 数据库实例的数据库参数组。我们建议您在同一个请求中配置所有参数值,以避免出现依赖项问题。

  • binlog_format=ROW

  • binlog_row_image=full

此外,请确保 binlog_row_value_options 参数设置为 PARTIAL_JSON。如果源数据库为多可用区数据库集群,请确保 binlog_transaction_compression 参数设置为 ON

步骤 2:选择或创建源数据库

创建自定义数据库参数组后,选择或创建一个 RDS for MySQL 数据库。该数据库将成为向 Amazon Redshift 复制数据的源。有关创建单可用区或多可用区数据库实例的说明,请参阅 创建 Amazon RDS 数据库实例有关创建多可用区数据库集群的说明,请参阅创建 Amazon RDS 的多可用区数据库集群

数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行 Amazon RDS 零 ETL 集成的区域和数据库引擎

创建数据库时,在其他配置下,将默认的数据库参数组更改为您在上一步中创建的自定义参数组。

注意

如果您在已创建数据库之后将参数组与数据库关联,则必须重启数据库以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅重启中的数据库实例重启 Amazon RDS 的多可用区数据库集群和读取器数据库实例

此外,请确保对数据库启用了自动备份。有关更多信息,请参阅 启用自动备份

步骤 3:创建目标 Amazon Redshift 数据仓库

创建源数据库后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:

  • 使用 RA3 节点类型以及至少两个节点或 Redshift Serverless。

  • 已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密

有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组

在数据仓库上启用区分大小写

要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。

要启用区分大小写,请根据您的数据仓库类型执行以下步骤:

  • 预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用 enable_case_sensitive_identifier 参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组使用 Amazon CLI 配置参数值

    注意

    将自定义参数组与集群关联后,请记得重启集群。

  • 无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    修改参数值后,无需重启工作组。

为数据仓库配置授权

创建数据仓库后,必须将源 RDS 数据库配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权

使用 Amazon SDK 设置集成

您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用适用于 Python (Boto3) 的 Amazon SDK 创建源 RDS for MySQL 数据库实例和目标 Amazon Redshift 数据仓库,其中每个都具有所需的参数值。然后,它会等待数据库变为可用后,再在它们之间创建零 ETL 集成。您可以根据需要设置的资源注释掉不同的函数。

要安装所需依赖项,请运行以下命令:

pip install boto3 pip install time

在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration 的集成。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()

后续步骤

借助源 RDS 数据库和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成