AWS Database Migration Service
用户指南 (版本 API Version 2016-01-01)
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将 Amazon Kinesis Data Streams 用作 AWS Database Migration Service 的目标

您可以使用 AWS DMS 将数据迁移到 Amazon Kinesis 数据流。Amazon Kinesis 数据流是 Amazon Kinesis Data Streams 服务的一部分。可以使用 Kinesis 数据流实时收集和处理大型数据记录流。

注意

在 AWS DMS 版本 3.1.2 及更高版本中支持将 Amazon Kinesis Data Streams 作为目标。

Kinesis 数据流由分片组成。分片是流中数据记录的唯一标识序列。有关 Amazon Kinesis Data Streams 中的分片的更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南 中的分片

AWS Database Migration Service 使用 JSON 将记录发布到 Kinesis 数据流。转换期间,AWS DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义分区键,Kinesis Data Streams 用它来将数据分组为分片。

当 AWS DMS 在 Amazon Kinesis Data Streams 目标终端节点上创建表时,它创建与源数据库终端节点相同数量的表。AWS DMS 还会设置几个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

为了帮助提高传输速度,AWS DMS 支持多线程完全加载到 Kinesis Data Streams 目标实例。对于包含下列内容的任务设置,DMS 支持此多线程处理:

注意

在 AWS DMS 版本 3.1.4 和更高版本中,支持使用多线程完全加载到 Amazon Kinesis Data Streams 目标。

  • MaxFullLoadSubTasks – 使用此选项指示要并行加载的表的最大数目。DMS 使用专用的子任务将各个表加载到其对应的 Kinesis 目标表。默认值为 8;最大值为 49。

  • ParallelLoadThreads – 使用此选项指定 AWS DMS 将各个表加载到其 Kinesis 目标表时使用的线程数。Kinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize – 使用此选项指定在缓冲区(并行加载线程将 Kinesis 数据加载到目标时使用)中存储的最大记录数。默认值是 50。最大值为 1,000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时ParallelLoadBufferSize 才有效。

  • 各个表的 Table-mapping 设置 – 使用 table-settings 规则标识您希望并行加载的源中各个表。另外可使用这些规则来指定如何为多线程加载的每个表的行分段。有关更多信息,请参阅 表设置规则和操作

    注意

    DMS 将表的各个段分配到其自己的线程进行加载。因此,将 ParallelLoadThreads 设置您为源中表指定的最大段数量。

将 Kinesis 数据流作为 AWS Database Migration Service 目标的先决条件

在将 Kinesis 数据流设置为 AWS DMS 目标之前,请确保您创建了 IAM 角色。此角色必须允许 AWS DMS 提供并授予对要迁移到的 Kinesis 数据流的访问权限。以下示例角色策略中显示了所需的最低访问权限集合。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您在迁移到 Kinesis 数据流时使用的角色必须具有以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

将 Kinesis Data Streams 作为 AWS Database Migration Service 目标时的限制

将 Kinesis Data Streams 作为目标时存在以下限制:

  • AWS DMS 将每个更新作为给定 Kinesis 数据流中的一个数据记录发布到源数据库中的一条记录。因此,使用流中数据的应用程序会失去事务边界的线索。

  • Kinesis 数据流不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南 中的处理重复记录

  • AWS DMS 支持以下两种分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:JSON 中其中一个字段的值,或源数据库中表的主键。

使用对象映射将数据迁移到 Kinesis 数据流

AWS DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

除了具有分区键以外,Kinesis 数据流没有预设结构。

要创建对象映射规则,您应将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS 当前只支持 map-record-to-recordrule-action 作为 map-record-to-document 参数的有效值。Map-record-to-recordmap-record-to-document 指定 AWS DMS 的默认操作,用于记录未在 exclude-columns 属性列表中排除的内容。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kinesis 数据流时使用 map-record-to-record。此规则类型使用关系数据库的 taskResourceId.schemaName.tableName 值作为 Kinesis 数据流中的分区键,并为源数据库中的每个列创建一个属性。在使用 map-record-to-record 时,对于源表中未在 exclude-columns 属性列表中列出的任何列,AWS DMS 将在目标流中创建对应的属性。不论是否在属性映射中使用源列,都会创建对应的属性。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息迁移到 Kinesis 数据流,您将创建规则来将数据映射到目标流。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

以下内容说明 Kinesis 数据流中生成的记录格式。

  • StreamName:XXX

  • PartitionKey:Test.Customers //schmaName.tableName

  • 数据://The following JSON message

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kinesis 数据流时,您可以调整数据结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

若要为分区键设置常量值,请指定一个 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }

Kinesis Data Streams 的消息格式

JSON 输出只是一个键/值对列表。AWS DMS 提供了以下预留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

操作

对于数据记录,操作可以是 createreadupdatedelete

对于控制记录,操作可以是 TruncateTableDropTable

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

时间戳

JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。

注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key