Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成
在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 Aurora 数据库集群和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:
完成这些步骤后,请继续执行创建 Aurora 与 Amazon Redshift 的零 ETL 集成。
您可以使用 Amazon SDK 为您自动完成设置过程。有关更多信息,请参阅 使用 Amazon SDK 设置集成。
步骤 1:创建自定义数据库集群参数组
Aurora 与 Amazon Redshift 的零 ETL 集成需要为控制复制的数据库集群参数提供特定值。具体而言,Aurora MySQL 需要增强型二进制日志(aurora_enhanced_binlog
),而 Aurora PostgreSQL 需要增强型逻辑复制(aurora.enhanced_logical_replication
)。
要配置二进制日志记录或逻辑复制,必须先创建自定义数据库集群参数组,然后将其与源数据库集群关联。
Aurora MySQL(aurora-mysql8.0 系列):
此外,请确保 binlog_transaction_compression
参数未设置为 ON
,也未将 binlog_row_value_options
参数设置为 PARTIAL_JSON
。
有关 Aurora MySQL 增强型二进制日志的更多信息,请参阅为 Aurora MySQL 设置增强型二进制日志。
Aurora PostgreSQL(aurora-postgresql16 系列):
-
rds.logical_replication=1
-
aurora.enhanced_logical_replication=1
-
aurora.logical_replication_backup=0
-
aurora.logical_replication_globaldb=0
启用增强型逻辑复制(aurora.enhanced_logical_replication
)始终会将所有列值都写入预写日志(WAL),即使 REPLICA IDENTITY FULL
未启用也是如此。这可能会增加源数据库集群的 IOPS。
如果您启用或禁用 aurora.enhanced_logical_replication
数据库集群参数,则主数据库实例将使所有逻辑复制槽失效。这将停止从源到目标的复制,并且您必须在主数据库实例上重新创建复制槽。为防止中断,请在复制过程中使参数状态保持一致。
步骤 2:选择或创建源数据库集群
创建自定义数据库集群参数组后,选择或创建一个 Aurora 数据库集群。该集群将成为向 Amazon Redshift 复制数据的源。您可以指定使用预调配数据库实例或 Aurora Serverless v2 数据库实例作为源的数据库集群。有关创建数据库集群的说明,请参阅 创建 Amazon Aurora 数据库集群 或 创建一个使用 Aurora Serverless v2 的数据库集群。
数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行零 ETL 集成的区域和 Amazon 数据库引擎。
创建数据库时,在其他配置下,将默认的数据库集群参数组更改为您在上一步中创建的自定义参数组。
步骤 3:创建目标 Amazon Redshift 数据仓库
创建源数据库集群后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:
有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组。
在数据仓库上启用区分大小写
要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier
)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。
要启用区分大小写,请根据您的数据仓库类型执行以下步骤:
-
预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用 enable_case_sensitive_identifier
参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组或使用 Amazon CLI 配置参数值。
-
无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 Amazon CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:
aws redshift-serverless update-workgroup \
--workgroup-name target-workgroup
\
--config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true
修改参数值后,无需重启工作组。
为数据仓库配置授权
创建数据仓库后,必须将源 Aurora 数据库集群配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权。
使用 Amazon SDK 设置集成
您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用适用于 Python (Boto3) 的 Amazon SDK 创建源 Amazon Aurora 数据库集群和目标 Amazon Redshift 数据仓库,其中每个都具有所需的参数值。然后,它会等待数据库变为可用后,再在它们之间创建零 ETL 集成。您可以根据需要设置的资源注释掉不同的函数。
要安装所需依赖项,请运行以下命令:
pip install boto3
pip install time
在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration
的集成。
- Aurora MySQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora MySQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-mysql8.0',
Description='For Aurora MySQL binary logging'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'aurora_enhanced_binlog',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_format',
'ParameterValue': 'ROW',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_image',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_metadata',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-mysql',
EngineVersion='8.0.mysql_aurora.3.05.2',
DatabaseName='myauroradb',
MasterUsername='username
',
MasterUserPassword='Password01**
'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-mysql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora MySQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username
',
MasterUserPassword='Password01**
',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration
'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
- Aurora PostgreSQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora PostgreSQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-postgresql16',
Description='For Aurora PostgreSQL logical replication'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'rds.logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.enhanced_logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-postgresql',
EngineVersion='16.4.aurora-postgresql',
DatabaseName='mypostgresdb',
MasterUsername='username
',
MasterUserPassword='Password01
**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-postgresql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora PostgreSQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username
',
MasterUserPassword='Password01**
',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration
'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
后续步骤
借助源 Aurora 数据库集群和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅创建 Aurora 与 Amazon Redshift 的零 ETL 集成。