开始使用 Amazon RDS 与 Amazon Redshift 的零 ETL 集成
在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 RDS 数据库和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:
完成这些步骤后,请继续执行创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成。
提示
您可以在创建集成时让 RDS 为您完成这些设置步骤,而不必手动执行这些步骤。要立即开始创建集成,请参阅创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成。
步骤 1:创建自定义数据库参数组
Amazon RDS 与 Amazon Redshift 的零 ETL 集成需要为控制二进制日志记录(binlog)的数据库参数提供特定值。要配置二进制日志记录,必须先创建自定义数据库参数组,然后将其与源数据库关联。
使用以下设置创建自定义数据库参数组。有关创建参数组的说明,请参阅Amazon RDS 数据库实例的数据库参数组。
-
binlog_format=ROW
-
binlog_row_image=full
此外,请确保 binlog_row_value_options
参数未设置为 PARTIAL_JSON
。
步骤 2:选择或创建源数据库
创建自定义数据库参数组后,选择或创建一个 RDS for MySQL 数据库。该数据库将成为向 Amazon Redshift 复制数据的源。有关创建单可用区或多可用区数据库实例的说明,请参阅创建 Amazon RDS 数据库实例。
数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行 Amazon RDS 零 ETL 集成的区域和数据库引擎。
有关创建单可用区或多可用区数据库实例的说明,请参阅创建 Amazon RDS 数据库实例。有关创建多可用区数据库集群的说明,请参阅创建 Amazon RDS 的多可用区数据库集群。
创建数据库时,在其他配置下,将默认的数据库参数组更改为您在上一步中创建的自定义参数组。
注意
如果您在已创建数据库之后 将参数组与数据库关联,则必须重启数据库以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅重启中的数据库实例或重启 Amazon RDS 的多可用区数据库集群和读取器数据库实例。
此外,请确保对数据库启用了自动备份。有关更多信息,请参阅 启用自动备份。
步骤 3:创建目标 Amazon Redshift 数据仓库
创建源数据库后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:
-
使用 RA3 节点类型以及至少两个节点或 Redshift Serverless。
-
已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密。
有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组。
在数据仓库上启用区分大小写
要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier
)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。
要启用区分大小写,请根据您的数据仓库类型执行以下步骤:
-
预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用
enable_case_sensitive_identifier
参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组或使用 Amazon CLI 配置参数值。注意
将自定义参数组与集群关联后,请记得重启集群。
-
无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:
aws redshift-serverless update-workgroup \ --workgroup-name
target-workgroup
\ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true修改参数值后,无需重启工作组。
为数据仓库配置授权
创建数据仓库后,必须将源 RDS 数据库配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权。
使用 Amazon SDK 设置集成
您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用Amazon SDK for Python (Boto3)
要安装所需依赖项,请运行以下命令:
pip install boto3 pip install time
在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration
的集成。
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername=
'username'
, MasterUserPassword='Password01**
' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration
' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()
后续步骤
借助源 RDS 数据库和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅 创建 Amazon RDS 与 Amazon Redshift 的零 ETL 集成。