使用 Amazon Kinesis Data Streams 作为目标 Amazon Database Migration Service - Amazon 数据库迁移服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon Kinesis Data Streams 作为目标 Amazon Database Migration Service

您可以使用将数据迁移 Amazon DMS 到 Amazon Kinesis 数据流。Amazon Kinesis 数据流是 Amazon Kinesis Data Streams 服务的一部分。可以使用 Kinesis 数据流实时收集和处理大型数据记录流。

Kinesis 数据流由分片组成。分片是流中数据记录的唯一标识序列。有关 Amazon Kinesis Data Streams 中的分片的更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的分片

Amazon Database Migration Service 使用 JSON 将记录发布到 Kinesis 数据流。在转换期间, Amazon DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对或 JSON_UNFORMATTED 消息格式。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。它允许 Amazon Data Firehose 将 Kinesis 数据传送到亚马逊 S3 目的地,然后使用包括亚马逊 Athena 在内的各种查询引擎对其进行查询。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义分区键,Kinesis Data Streams 用它来将数据分组为分片。

在 Kinesis Data Streams 目标端点上 Amazon DMS 创建表时,它创建的表与源数据库终端节点中创建的表数量一样多。 Amazon DMS 还会设置多个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

注意

Amazon DMS 控制台或 API 上的 SSL 模式选项不适用于某些数据流和 NoSQL 服务,例如 Kinesis 和 DynamoDB。默认情况下,它们是安全的,因此 Amazon DMS 显示 SSL 模式设置等于无(SSL 模式=无)。您无需为端点提供任何其他配置即可使用 SSL。例如,使用 Kinesis 作为目标端点时,默认情况下它是安全的。所有对 Kinesis 的 API 调用都使用 SSL,因此无需在终端节点中添加额外的 SSL 选项。 Amazon DMS 您可以使用 HTTPS 协议安全地存放数据和通过 SSL 端点检索数据,Kinesis 数据流在连接到 Amazon DMS 时默认使用该协议。

Kinesis Data Streams 端点设置

当你使用 Kinesis Data Streams 目标端点时,你可以使用 KinesisSettings API 中的 Amazon DMS 选项获取交易和控制细节。

您可以使用以下方法之一设置连接:

  • 在 Amazon DMS 控制台中,使用端点设置。

  • 在 CLI 中,使用CreateEndpoint命令的kinesis-settings选项。

在 CLI 中,使用 kinesis-settings 选项的下列请求参数:

注意

Amazon DMS 版本 3.4.1 及更高版本支持 IncludeNullAndEmpty 端点设置。但是,中支持Kinesis Data Streams目标的以下其他端点设置。 Amazon DMS

  • MessageFormat – 在端点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • IncludeControlDetails – 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认值为 false

  • IncludeNullAndEmpty – 在目标中包括空列。默认值为 false

  • IncludePartitionValue – 在 Kinesis 消息输出中显示分区值,除非分区类型为 schema-table-type。默认值为 false

  • IncludeTableAlterOperations – 包括更改控制数据中表的任何数据定义语言(DDL)操作,例如 rename-tabledrop-tableadd-columndrop-columnrename-column。默认值为 false

  • IncludeTransactionDetails – 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id (事务内的记录偏移)的值。默认值为 false

  • PartitionIncludeSchemaTable – 当分区类型为 primary-key-type 时,将架构和表名称作为分区值的前缀。这样做会增加 Kinesis 分片之间的数据分布。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在此况下,同一主键将从数千个表发送到同一个分片,这会导致限制。默认值为 false

以下示例显示在使用 Amazon CLI 发出的示例 create-endpoint 命令中使用的 kinesis-settings 选项。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
多线程完全加载任务设置

为了帮助提高传输速度, Amazon DMS 支持对 Kinesis Data Streams 目标实例进行多线程满载。对于包含下列内容的任务设置,DMS 支持此多线程处理:

  • MaxFullLoadSubTasks – 使用此选项指示要并行加载的表的最大数目。DMS 使用专用子任务将各个表加载到其对应的 Kinesis 目标表。默认值为 8;最大值为 49。

  • ParallelLoadThreads— 使用此选项指定用于将每个表加载到其 Kinesis 目标表的线程数。 Amazon DMS Kinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize – 使用此选项指定在缓冲区(并行加载线程将数据加载到 Kinesis 目标时使用)中存储的最大记录数。默认值是 50。最大值为 1000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时 ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread – 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值是 1。但是,对于各种负载大小的 Kinesis 目标,有效范围为每个线程 5-512 个队列。

多线程 CDC 加载任务设置

您可以提高实时数据流目标端点的更改数据捕捉(CDC)的性能,例如,Kinesis 使用任务设置来修改 PutRecords API 调用的行为。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

为了提高 CDC 性能, Amazon DMS 支持以下任务设置:

  • ParallelApplyThreads— 指定在 CDC 加载期间 Amazon DMS 用于将数据记录推送到 Kinesis 目标端点的并发线程数。默认值为零(0),最大值为 32。

  • ParallelApplyBufferSize – 指定在 CDC 加载过程中,要在每个缓冲区队列中存储的、供并发线程推送到目标 Kinesis 端点的最大记录数。默认值是 100,最大值是 1,000。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread – 指定每个线程访问以将数据记录从队列中取出并在 CDC 期间为 Kinesis 端点生成批处理负载的队列数。默认值是 1,最大值是 512。

使用 ParallelApply* 任务设置时,partition-key-type 默认值是表的 primary-key,而不是 schema-name.table-name

使用之前映像查看 Kinesis 数据流(作为目标)的 CDC 行的原始值

在将 CDC 更新写入数据流目标(如 Kinesis)时,可以在更新进行更改之前查看源数据库行的原始值。为此,请根据源数据库引擎提供的数据 Amazon DMS 填充更新事件之前的图像

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • PostgreSQL 仅为作为主键一部分的列(已更改或未更改)提供数据。要为所有列提供数据(无论是否更改),您需要将 REPLICA_IDENTITY 设置为 FULL 而不是 DEFAULT。请注意,您应该仔细选择每张表的 REPLICA_IDENTITY 设置。如果将 REPLICA_IDENTITY 设置为 FULL,则所有列值都将连续写入预写日志(WAL)。这可能会导致经常更新的表出现性能或资源问题。

  • MySQL 通常为 BLOB 和 CLOB 数据类型以外的所有列(已更改或未更改)提供数据。

要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings 使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings适用于包含 CDC 组件的 Amazon DMS 任务,例如满负荷加上 CDC 任务(迁移现有数据并复制正在进行的更改),或仅适用于 CDC 的任务(仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认值为 false

  • 使用 FieldName 选项为新 JSON 属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。要添加具有之前映像值的任何列,请使用 all。请注意,之前映像不包括含有 LOB 数据类型(如 CLOB 或 BLOB)的列。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注意

Amazon S3 目标不支持 BeforeImageSettings。对于 S3 目标,在 CDC 期间仅使用 add-before-image-columns 转换规则来执行之前映像。

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 Kinesis)启用之前映像。

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 add-before-image-columns 参数的 transformation 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值附加到列名称前面,column-prefix 的默认值为 BI_column-suffix 的值将附加到列名称之后,默认值为空。不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。要仅添加属于表主键一部分的列,请选择 pk-only。选择 non-lob 以仅添加不属于 LOB 类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您将 CDC 更新写入 Amazon S3 目标时,通过 BI_emp_no 列能够判断哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作

使用 Kinesis 数据流作为目标的先决条件 Amazon Database Migration Service

用于使用 Kinesis 数据流作为目标的 IAM 角色 Amazon Database Migration Service

在将 Kinesis 数据流设置为目标之前 Amazon DMS,请务必创建一个 IAM 角色。此角色必须允许 Amazon DMS 代入并授予对正在迁移到的 Kinesis 数据流的访问权限。以下 IAM policy 中显示了所需的最小访问权限集合。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您在迁移到 Kinesis 数据流时使用的角色必须具有以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

将 Kinesis 数据流作为目标进行访问 Amazon Database Migration Service

在 3.4.7 及更高 Amazon DMS 版本中,要连接到 Kinesis 终端节点,必须执行以下操作之一:

使用 Kinesis Data Streams 作为目标时的限制 Amazon Database Migration Service

将 Kinesis Data Streams 作为目标时存在以下限制:

  • Amazon DMS 无论事务如何,都会将每次更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的一条记录。但是,您可以使用 KinesisSettings API 的相关参数包含每条数据记录的事务详细信息。

  • 不支持完整 LOB 模式。

  • 支持的最大 LOB 大小为 1 MB。

  • Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的处理重复记录

  • Amazon DMS 支持以下两种形式的分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:JSON 中其中一个字段的值,或源数据库中表的主键。

  • 有关在 Kinesis Data Streams 中加密静态数据的信息,请参阅《Amazon Key Management Service 开发人员指南》中的 Kinesis Data Streams 中的数据保护

  • Kinesis 端点不支持 BatchApply。对 Kinesis 目标使用“批应用”(例如,BatchApplyEnabled 目标元数据任务设置)可能会导致数据丢失。

  • 只有与复制实例相同 Amazon 账户的 Kinesis 数据流才支持 Kinesi Amazon Web Services 区域 s 目标。

  • 从 MySQL 源迁移时, BeforeImage 数据不包括 CLOB 和 BLOB 数据类型。有关更多信息,请参阅使用之前映像查看 Kinesis 数据流(作为目标)的 CDC 行的原始值

  • Amazon DMS 不支持迁移超过 16 位BigInt数据类型的值。要解决此限制问题,您可以使用以下转换规则将 BigInt 列转换为字符串。有关转换规则的更多信息,请参阅 转换规则和操作

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

使用对象映射将数据迁移到 Kinesis 数据流

Amazon DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

除了具有分区键以外,Kinesis 数据流没有预设结构。在对象映射规则中,数据记录的 partition-key-type 的可能值为 schema-tabletransaction-idprimary-keyconstantattribute-name

要创建对象映射规则,您应将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

Amazon DMS 目前支持将map-record-to-recordmap-record-to-document作为该rule-action参数的唯一有效值。这些设置会影响未作为 exclude-columns 属性列表一部分排除的值。map-record-to-recordmap-record-to-document值指定默认情况下如何 Amazon DMS 处理这些记录。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kinesis 数据流时使用 map-record-to-record。此规则类型使用关系数据库的 taskResourceId.schemaName.tableName 值作为 Kinesis 数据流中的分区键,并为源数据库中的每个列创建一个属性。

使用 map-record-to-record 时请注意:

  • 此设置仅影响 exclude-columns 列表排除的列。

  • 对于每个这样的列,在目标主题中 Amazon DMS 创建一个相应的属性。

  • Amazon DMS 无论源列是否用于属性映射,都会创建相应的属性。

通过使用 map-record-to-document,可使用属性名“_doc”将源列放入相应目标流中的单个平面文档中。 Amazon DMS 将数据放入源上名为“_doc”的单个平面映射中。此放置应用于源表中的未在 exclude-columns 属性列表中列出的任何列。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Test 的架构迁移到 Kinesis 数据流,您将创建规则来将数据映射到目标流。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

以下内容说明 Kinesis 数据流中生成的记录格式。

  • StreamName: XXX

  • PartitionKey: test.Customers //schmaname.tableNam

  • 数据://The following JSON message

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

但是,假设您使用相同的规则,但将 rule-action 参数更改为 map-record-to-document 并排除某些列。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

在这种情况下,exclude-columns 参数中未列出的列 FirstNameLastNameStoreIdDateOfBirth 将映射到 _doc。以下内容说明生成的记录格式。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kinesis 数据流时,您可以调整数据结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

Kinesis Data Streams 的消息格式

JSON 输出只是键值对的列表。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。

Amazon DMS 提供了以下保留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

操作

对于数据记录,操作可以是 loadinsertupdatedelete

对于控制记录,操作可以是 create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-columncolumn-type-change

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

Timestamp

JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。