本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
以 Amazon Kinesis Data Streams 为接收器 Amazon Database Migration Service
您可以使用将数据迁移Amazon DMS到Amazon Kinesis 数据流。Amazon Kinesis Data Streams 服务的一部分。您可以使用 Kinesis 数据流实时收集和处理大型数据流记录。
数据流由分片组成 Kinesis 数据流。分片是流中数据记录的唯一标识序列。有关Amazon Kinesis Data Streams 开发人员指南中的分区,请参阅 Am azon Kinesis D ata Streams 开发人员指南中的分区。
Amazon Database Migration Service使用 JSON 将记录发布到 Kinesis 数据流。在转换期间,Amazon DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对或 JSON_UNFORMATTED 消息格式。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。它允许Amazon Kinesis Data Firehose 将 Kinesis 数据传送到Amazon S3 目的地,然后使用包括Amazon Athena 在内的各种查询引擎进行查询。
您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义一个分区键,Kinesis Data Streams 使用该分区键将数据分组到其分区中。
在 Kinesis Data Streams 目标端点上Amazon DMS创建表时,它创建的表数量与源数据库端点中的表一样多。 Amazon DMS还设置了多个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。
注意
Amazon DMS控制台或 API 上的 SSL 模式选项不适用于某些数据流和 NoSQL 服务,例如 Kinesis 和 DynamoDB。它们在默认情况下是安全的,因此Amazon DMS显示 SSL 模式设置等于无(SSL 模式=无)。您无需为终端节点提供任何其他配置即可使用 SSL。例如,当使用 Kinesis 作为目标端点时,它在默认情况下是安全的。对 Kinesis 的所有 API 调用都使用 SSL,因此无需在Amazon DMS端点中添加额外的 SSL 选项。您可以使用 HTTPS 协议安全地通过 SSL 端点放置数据和检索数据,该协议在连接到 Kinesis 数据流时默认Amazon DMS使用。
Kinesis Data Streams s 端点设置
当您使用 Kinesis Data Streams 目标端点时,您可以使用 Amazon DMS API 中的KinesisSettings
选项获取交易和控制详细信息。
您可以使用以下方法设置连接设置:
在Amazon DMS控制台中,使用端点设置。
在 CLI 中,使用CreateEndpoint命令的
kinesis-settings
选项。
在 CLI 中,使用该kinesis-settings
选项的以下请求参数:
注意
3.4.1 及更高Amazon DMS版本中提供对IncludeNullAndEmpty
端点设置的Support。但是,中提供了对 Kinesis Data Streams 目标的其他端点设置的Amazon DMS支持。
MessageFormat
— 在端点上创建的记录的输出格式。消息格式为JSON
(默认值)或JSON_UNFORMATTED
(单行,无制表符)。-
IncludeControlDetails
— 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认为false
。 -
IncludeNullAndEmpty
— 在目标中包括 NULL 列和空列。默认为false
。 -
IncludePartitionValue
— 显示 Kinesis 消息输出中的分区值,除非分区类型为分区类型为schema-table-type
Kinesis 消息输出中的分区值。默认为false
。 -
IncludeTableAlterOperations
— 包括任何更改控制数据中表的数据定义语言 (DDL) 操作,例如rename-table
drop-table
、add-column
drop-column
、和rename-column
。默认为false
。 -
IncludeTransactionDetails
— 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及transaction_id
、previous_transaction_id
和transaction_record_id
(事务内的记录偏移)的值。默认为false
。 -
PartitionIncludeSchemaTable
— 当分区类型为分区值时,将架构和表名称作为分区值primary-key-type
的前缀。这样做会增加 Kinesis 分片之间的数据分布。例如,假设SysBench
架构具有数千个表,并且每个表的主键只有有限的范围。在此况下,同一主键将从数千个表发送到同一个分片,这会导致限制。默认为false
。
以下示例显示了正在使用的kinesis-settings
选项以及使用发出create-endpoint
命令的示例Amazon CLI。
aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
多线程完全加载任务设置
为了帮助提高传输速度,Amazon DMS支持对 Kinesis Data Streams 目标实例进行多线程满负载。对于包含下列内容的任务设置,DMS 支持此多线程处理:
-
MaxFullLoadSubTasks
— 使用此选项指明要parallel 加载的源表的最大数量。DMS 使用专用子任务将每个表加载到相应的 Kinesis 目标表中。默认值为 8;最大值为 49。 -
ParallelLoadThreads
— 使用此选项指定用于将每个表加载到其 Kinesis 目标表中的线程数。Amazon DMSKinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。 -
ParallelLoadBufferSize
— 使用此选项指定要存储在缓冲区中的最大记录数,parallel 加载线程用于将数据加载到 Kinesis 目标。默认值为 50。最大值为 1,000。将此设置与ParallelLoadThreads
一起使用;仅在有多个线程时ParallelLoadBufferSize
才有效。 -
ParallelLoadQueuesPerThread
— 使用此选项指定每个并发线程访问的队列数,以便从队列中提取数据记录并为目标生成批量加载。默认值为 1。但是,对于不同负载大小的 Kinesis 目标,有效范围为每个线程 5—512 个队列。
多线程 CDC 加载任务设置
您可以使用任务设置修改 PutRecords
API 调用的行为,提高实时数据流目标端点(如 Kinesis)的变更数据捕获 (CDC) 性能。为此,您可以使用 ParallelApply*
任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。
要提高 CDC 性能,请 Amazon DMS 支持以下任务设置:
-
ParallelApplyThreads
— 指定在 CDC 加载期间将数据记录推送到 Kinesis 目标端点时Amazon DMS使用的并发线程数。默认值为零 (0),最大值为 32。 -
ParallelApplyBufferSize
— 指定在 CDC 加载期间将并发线程推送到 Kinesis 目标端点的每个缓冲队列中存储的最大记录数。默认值为 100 个接收器,最大值为 1000 个接收器。当ParallelApplyThreads
指定多个线程时,请使用此选项。 -
ParallelApplyQueuesPerThread
— 指定每个线程访问的队列数量,以便在 CDC 期间从队列中提取数据记录并为 Kinesis 端点生成批量加载。默认值为 1 天 1 天天,最大值为 512 天天。
使用 ParallelApply*
任务设置时, partition-key-type
默认值是表的 primary-key
,而不是 schema-name.table-name
。
使用之前的图像查看 Kinesis 数据流作为目标的 CDC 行的原始值
在将 CDC 更新写入 Kinesis 等数据流目标时,可以在更新更改之前查看源数据库行的原始值。为此,Amazon DMS 根据源数据库引擎提供的数据填充更新事件的之前映像。
不同的源数据库引擎为之前映像提供不同的信息量:
-
仅当列发生更改时,Oracle 才会对列提供更新。
-
PostgreSQL 仅为作为主键一部分的列(已更改或未更改)提供数据。要为所有列提供数据(无论是否已更改),您需要将设置
REPLICA_IDENTITY
为FULL
而不是DEFAULT
。请注意,您应该仔细选择每张表的REPLICA_IDENTITY
设置。如果设置为REPLICA_IDENTITY
FULL
,则所有列值都将持续写入预写日志 (WAL)。这可能会导致经常更新的表出现性能或资源问题。 -
MySQL 通常为除 BLOB 和 CLOB 数据类型(更改与否)之外的所有列提供数据。
要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出,请使用 BeforeImageSettings
任务设置或 add-before-image-columns
参数。此参数应用列转换规则。
BeforeImageSettings
使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意
仅BeforeImageSettings
适用于包含 CDC 组件的Amazon DMS任务,例如满载加上 CDC 任务(迁移现有数据并复制正在进行的更改),或仅适用于仅限 CDC 的任务(仅复制数据更改)。不将 BeforeImageSettings
应用于仅完全加载的任务。
对于 BeforeImageSettings
选项,以下条件适用:
-
将
EnableBeforeImage
选项设置为true
以启用之前映像。默认为false
。 -
使用
FieldName
选项为新 JSON 属性指定名称。当EnableBeforeImage
为true
时,FieldName
是必填项且不能为空。 -
ColumnFilter
选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值pk-only
。要添加具有之前映像值的任何列,请使用all
。请注意,之前的图像不包含具有 LOB 数据类型的列,例如 CLOB 或 BLOB。"BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注意
Amazon S3 目标不支持BeforeImageSettings
。对于 S3 目标,在 CDC 期间仅使用 add-before-image-columns
转换规则来执行之前映像。
使用之前映像转换规则
作为任务设置的替代方法,您可以使用 add-before-image-columns
参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 Kinesis)启用映像前功能。
通过在转换规则中使用 add-before-image-columns
,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。
注意
不要在同一任务中将 add-before-image-columns
参数与 BeforeImageSettings
任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。
包含列的 add-before-image-columns
参数的 transformation
规则类型必须提供一个 before-image-def
部分。下面是一个示例。
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
column-prefix
的值附加到列名称前面,column-prefix
的默认值为 BI_
。column-suffix
的值将附加到列名称之后,默认值为空。不要同时将 column-prefix
和 column-suffix
设置为空字符串。
为 column-filter
选择一个值。要仅添加属于表主键一部分的列,请选择 pk-only
。选择 non-lob
以仅添加不属于 LOB 类型的列。或者,选择 all
以添加任何具有之前映像值的列。
之前映像转换规则的示例
以下示例中的转换规则在目标中添加一个名为 BI_emp_no
的新列。因此,像 UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
这样的语句用 1 填充 BI_emp_no
字段。当您向 Amazon S3 目标写入 CDC 更新时,该BI_emp_no
列可以分辨出哪个原始行已更新。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
有关使用 add-before-image-columns
规则操作的信息,请参阅 转换规则和操作。
以 Kinesis 数据流作为目标的先决条件 Amazon Database Migration Service
在将 Kinesis 数据流设置为目标之前Amazon DMS,请确保创建 IAM 角色。此角色必须允许Amazon DMS代入和授予对正在迁移到的 Kinesis 数据流的访问权限。以下 IAM 策略中显示了所需的最小访问权限集合。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
您用于迁移到 Kinesis 数据流的角色必须具有以下权限。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:
region
:accountID
:stream/streamName
" } ] }
以 Kinesis Data Streams 为接收者时的限制 Amazon Database Migration Service
使用 Kinesis Data Streams 作为目标时,存在以下限制:
-
Amazon DMS无论事务如何,都将每次更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的单个记录。但是,您可以使用
KinesisSettings
API 的相关参数包含每条数据记录的事务详细信息。 -
不支持完整 LOB 模式。
-
支持的最大 LOB 大小为 1 MB MB MB MB。
-
Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的处理重复记录。
-
Amazon DMS 支持以下两种分区键:
-
SchemaName.TableName
:架构和表名称的组合。 -
${AttributeName}
:JSON 中其中一个字段的值,或源数据库中表的主键。
-
-
有关在 Kinesis Data Streams 中加密静态数据的信息,请参阅《开发人员指南》中的 Kinesis Data Streams 中的Amazon Key Management Service数据保护。
-
BatchApply
不支持 Kinesis 端点。对 KinesisBatchApplyEnabled
目标使用Batch 应用(例如,目标元数据任务设置)可能会导致数据丢失。 -
只有同一Amazon账户中的 Kinesis 数据流支持 Kinesis 目标。
-
从 MySQL 源迁移时,BeforeImage数据不包括 CLOB 和 BLOB 数据类型。有关更多信息,请参阅使用之前的图像查看 Kinesis 数据流作为目标的 CDC 行的原始值:
-
Amazon DMS迁移科学记数法形式中超过 16 位数
BigInt
的数据类型的值。
使用对象映射将数据迁移到 Kinesis 数据流
Amazon DMS使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。
除了分区键外,Kinesis 数据流没有其他预设结构。在对象映射规则中,数据记录的 partition-key-type
的可能值为 schema-table
、transaction-id
、primary-key
、constant
和 attribute-name
。
要创建对象映射规则,您应将 rule-type
指定为 object-mapping
。此规则指定您要使用的对象映射的类型。
规则的结构如下所示。
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
Amazon DMS 目前支持 map-record-to-record
和 map-record-to-document
作为 rule-action
参数的唯一有效值。map-record-to-record
和 map-record-to-document
值指定 Amazon DMS 默认情况下对未作为 exclude-columns
属性列表的一部分排除的记录执行的操作。这些值不会以任何方式影响属性映射。
从关系数据库迁移到 Kinesis 数据流map-record-to-record
时使用。此规则类型使用关系数据库中的taskResourceId.schemaName.tableName
值作为 Kinesis 数据流中的分区键,并为源数据库中的每列创建一个属性。在使用 map-record-to-record
时,对于源表中未在 exclude-columns
属性列表中列出的任何列,Amazon DMS 将在目标流中创建对应的属性。不论是否在属性映射中使用源列,都会创建对应的属性。
map-record-to-document
用于使用属性名称 “_doc” 将源列放入相应目标流中的单个平面文档中。 Amazon DMS将数据放入名为 “_doc
” 的源上的单一平面地图中。此放置应用于源表中的未在 exclude-columns
属性列表中列出的任何列。
了解 map-record-to-record
的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
02/29/1988 |
要将此信息从名为 Kinesis 数据流的架构迁移Test
到 Kinesis 数据流,需要创建规则将数据映射到目标流。以下规则对此映射进行了说明。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
下图说明了 Kinesis 数据流中生成的记录格式:
-
StreamName: XXX
-
PartitionKey: Test.customers//schmaname.tableNam
-
数据://The following JSON message
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
但是,假设您使用相同的规则,但将rule-action
参数更改为map-record-to-document
并排除某些列。以下规则对此映射进行了说明。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }
在这种情况下,exclude-columns
参数、FirstName
StoreId
和DateOfBirth
中未列出的列将映射到_doc
。LastName
以下说明了生成的记录格式。
{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }
使用属性映射调整数据结构
在使用属性映射将数据迁移到 Kinesis 数据流时,您可以重构数据。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
要为 partition-key
设置常量值,请指定 partition-key
值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意
表示特定表的控制记录的 partition-key
值为 TaskId.SchemaName.TableName
。表示特定任务的控制记录的 partition-key
值为该记录的 TaskId
。在对象映射中指定 partition-key
值不会影响控制记录的 partition-key
。
Kinesis Data Streams 的消息格式
JSON 输出只是键值对的列表。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。
Amazon DMS提供以下保留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:
- RecordType
-
记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。
- 运算
-
对于数据记录,操作可以是
load
、insert
、update
或delete
。对于控制记录,操作可以是
create-table
、rename-table
、drop-table
、change-columns
add-column
、drop-column
、rename-column
、或column-type-change
。 - SchemaName
-
记录的源架构。此字段对于控制记录可能是空的。
- TableName
-
记录的源表。此字段对于控制记录可能是空的。
- 时间戳
-
JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。