以 Amazon Kinesis Data Streams 为接收器 Amazon Database Migration Service - Amazon Database Migration Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

以 Amazon Kinesis Data Streams 为接收器 Amazon Database Migration Service

您可以使用将数据迁移Amazon DMS到Amazon Kinesis 数据流。Amazon Kinesis Data Streams 服务的一部分。您可以使用 Kinesis 数据流实时收集和处理大型数据流记录。

数据流由分片组成 Kinesis 数据流。分片是流中数据记录的唯一标识序列。有关Amazon Kinesis Data Streams 开发人员指南中的分区,请参阅 Am azon Kinesis D ata Streams 开发人员指南中的分区。

Amazon Database Migration Service使用 JSON 将记录发布到 Kinesis 数据流。在转换期间,Amazon DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对或 JSON_UNFORMATTED 消息格式。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。它允许Amazon Kinesis Data Firehose 将 Kinesis 数据传送到Amazon S3 目的地,然后使用包括Amazon Athena 在内的各种查询引擎进行查询。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射,您确定如何在流中建立数据记录结构。您还可以为每个表定义一个分区键,Kinesis Data Streams 使用该分区键将数据分组到其分区中。

在 Kinesis Data Streams 目标端点上Amazon DMS创建表时,它创建的表数量与源数据库端点中的表一样多。 Amazon DMS还设置了多个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

注意

Amazon DMS控制台或 API 上的 SSL 模式选项不适用于某些数据流和 NoSQL 服务,例如 Kinesis 和 DynamoDB。它们在默认情况下是安全的,因此Amazon DMS显示 SSL 模式设置等于无(SSL 模式=无)。您无需为终端节点提供任何其他配置即可使用 SSL。例如,当使用 Kinesis 作为目标端点时,它在默认情况下是安全的。对 Kinesis 的所有 API 调用都使用 SSL,因此无需在Amazon DMS端点中添加额外的 SSL 选项。您可以使用 HTTPS 协议安全地通过 SSL 端点放置数据和检索数据,该协议在连接到 Kinesis 数据流时默认Amazon DMS使用。

Kinesis Data Streams s 端点设置

当您使用 Kinesis Data Streams 目标端点时,您可以使用 Amazon DMS API 中的KinesisSettings选项获取交易和控制详细信息。

您可以使用以下方法设置连接设置:

  • 在Amazon DMS控制台中,使用端点设置。

  • 在 CLI 中,使用CreateEndpoint命令的kinesis-settings选项。

在 CLI 中,使用该kinesis-settings选项的以下请求参数:

注意

3.4.1 及更高Amazon DMS版本中提供对IncludeNullAndEmpty端点设置的Support。但是,中提供了对 Kinesis Data Streams 目标的其他端点设置的Amazon DMS支持。

  • MessageFormat— 在端点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • IncludeControlDetails— 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认为 false

  • IncludeNullAndEmpty— 在目标中包括 NULL 列和空列。默认为 false

  • IncludePartitionValue— 显示 Kinesis 消息输出中的分区值,除非分区类型为分区类型为 schema-table-type Kinesis 消息输出中的分区值。默认为 false

  • IncludeTableAlterOperations— 包括任何更改控制数据中表的数据定义语言 (DDL) 操作,例如rename-tabledrop-tableadd-columndrop-column、和rename-column。默认为 false

  • IncludeTransactionDetails— 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id (事务内的记录偏移)的值。默认为 false

  • PartitionIncludeSchemaTable— 当分区类型为分区值时,将架构和表名称作为分区值primary-key-type的前缀。这样做会增加 Kinesis 分片之间的数据分布。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在此况下,同一主键将从数千个表发送到同一个分片,这会导致限制。默认为 false

以下示例显示了正在使用的kinesis-settings选项以及使用发出create-endpoint命令的示例Amazon CLI。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
多线程完全加载任务设置

为了帮助提高传输速度,Amazon DMS支持对 Kinesis Data Streams 目标实例进行多线程满负载。对于包含下列内容的任务设置,DMS 支持此多线程处理:

  • MaxFullLoadSubTasks— 使用此选项指明要parallel 加载的源表的最大数量。DMS 使用专用子任务将每个表加载到相应的 Kinesis 目标表中。默认值为 8;最大值为 49。

  • ParallelLoadThreads— 使用此选项指定用于将每个表加载到其 Kinesis 目标表中的线程数。Amazon DMSKinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize— 使用此选项指定要存储在缓冲区中的最大记录数,parallel 加载线程用于将数据加载到 Kinesis 目标。默认值为 50。最大值为 1,000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread— 使用此选项指定每个并发线程访问的队列数,以便从队列中提取数据记录并为目标生成批量加载。默认值为 1。但是,对于不同负载大小的 Kinesis 目标,有效范围为每个线程 5—512 个队列。

多线程 CDC 加载任务设置

您可以使用任务设置修改 PutRecords API 调用的行为,提高实时数据流目标端点(如 Kinesis)的变更数据捕获 (CDC) 性能。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

要提高 CDC 性能,请 Amazon DMS 支持以下任务设置:

  • ParallelApplyThreads— 指定在 CDC 加载期间将数据记录推送到 Kinesis 目标端点时Amazon DMS使用的并发线程数。默认值为零 (0),最大值为 32。

  • ParallelApplyBufferSize— 指定在 CDC 加载期间将并发线程推送到 Kinesis 目标端点的每个缓冲队列中存储的最大记录数。默认值为 100 个接收器,最大值为 1000 个接收器。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread— 指定每个线程访问的队列数量,以便在 CDC 期间从队列中提取数据记录并为 Kinesis 端点生成批量加载。默认值为 1 天 1 天天,最大值为 512 天天。

使用 ParallelApply* 任务设置时, partition-key-type 默认值是表的 primary-key,而不是 schema-name.table-name

使用之前的图像查看 Kinesis 数据流作为目标的 CDC 行的原始值

在将 CDC 更新写入 Kinesis 等数据流目标时,可以在更新更改之前查看源数据库行的原始值。为此,Amazon DMS 根据源数据库引擎提供的数据填充更新事件的之前映像

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • PostgreSQL 仅为作为主键一部分的列(已更改或未更改)提供数据。要为所有列提供数据(无论是否已更改),您需要将设置REPLICA_IDENTITYFULL而不是DEFAULT。请注意,您应该仔细选择每张表的REPLICA_IDENTITY设置。如果设置为 REPLICA_IDENTITYFULL,则所有列值都将持续写入预写日志 (WAL)。这可能会导致经常更新的表出现性能或资源问题。

  • MySQL 通常为除 BLOB 和 CLOB 数据类型(更改与否)之外的所有列提供数据。

要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings 使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings适用于包含 CDC 组件的Amazon DMS任务,例如满载加上 CDC 任务(迁移现有数据并复制正在进行的更改),或仅适用于仅限 CDC 的任务(仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认为 false

  • 使用 FieldName 选项为新 JSON 属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。要添加具有之前映像值的任何列,请使用 all。请注意,之前的图像不包含具有 LOB 数据类型的列,例如 CLOB 或 BLOB。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注意

Amazon S3 目标不支持BeforeImageSettings。对于 S3 目标,在 CDC 期间仅使用 add-before-image-columns 转换规则来执行之前映像。

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 Kinesis)启用映像前功能。

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 add-before-image-columns 参数的 transformation 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值附加到列名称前面,column-prefix 的默认值为 BI_column-suffix 的值将附加到列名称之后,默认值为空。不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。要仅添加属于表主键一部分的列,请选择 pk-only。选择 non-lob 以仅添加不属于 LOB 类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您向 Amazon S3 目标写入 CDC 更新时,该BI_emp_no列可以分辨出哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作

以 Kinesis 数据流作为目标的先决条件 Amazon Database Migration Service

在将 Kinesis 数据流设置为目标之前Amazon DMS,请确保创建 IAM 角色。此角色必须允许Amazon DMS代入和授予对正在迁移到的 Kinesis 数据流的访问权限。以下 IAM 策略中显示了所需的最小访问权限集合。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

您用于迁移到 Kinesis 数据流的角色必须具有以下权限。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

以 Kinesis Data Streams 为接收者时的限制 Amazon Database Migration Service

使用 Kinesis Data Streams 作为目标时,存在以下限制:

  • Amazon DMS无论事务如何,都将每次更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的单个记录。但是,您可以使用 KinesisSettings API 的相关参数包含每条数据记录的事务详细信息。

  • 不支持完整 LOB 模式。

  • 支持的最大 LOB 大小为 1 MB MB MB MB。

  • Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息,请参阅 Amazon Kinesis Data Streams 开发人员指南中的处理重复记录

  • Amazon DMS 支持以下两种分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:JSON 中其中一个字段的值,或源数据库中表的主键。

  • 有关在 Kinesis Data Streams 中加密静态数据的信息,请参阅《开发人员指南》中的 Kinesis Data Streams 中的Amazon Key Management Service数据保护

  • BatchApply不支持 Kinesis 端点。对 Kinesis BatchApplyEnabled 目标使用Batch 应用(例如,目标元数据任务设置)可能会导致数据丢失。

  • 只有同一Amazon账户中的 Kinesis 数据流支持 Kinesis 目标。

  • 从 MySQL 源迁移时,BeforeImage数据不包括 CLOB 和 BLOB 数据类型。有关更多信息,请参阅使用之前的图像查看 Kinesis 数据流作为目标的 CDC 行的原始值

  • Amazon DMS迁移科学记数法形式中超过 16 位数BigInt的数据类型的值。

使用对象映射将数据迁移到 Kinesis 数据流

Amazon DMS使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流,您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

除了分区键外,Kinesis 数据流没有其他预设结构。在对象映射规则中,数据记录的 partition-key-type 的可能值为 schema-tabletransaction-idprimary-keyconstantattribute-name

要创建对象映射规则,您应将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

Amazon DMS 目前支持 map-record-to-recordmap-record-to-document 作为 rule-action 参数的唯一有效值。map-record-to-recordmap-record-to-document 值指定 Amazon DMS 默认情况下对未作为 exclude-columns 属性列表的一部分排除的记录执行的操作。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kinesis 数据流map-record-to-record时使用。此规则类型使用关系数据库中的taskResourceId.schemaName.tableName值作为 Kinesis 数据流中的分区键,并为源数据库中的每列创建一个属性。在使用 map-record-to-record 时,对于源表中未在 exclude-columns 属性列表中列出的任何列,Amazon DMS 将在目标流中创建对应的属性。不论是否在属性映射中使用源列,都会创建对应的属性。

map-record-to-document用于使用属性名称 “_doc” 将源列放入相应目标流中的单个平面文档中。 Amazon DMS将数据放入名为 “_doc” 的源上的单一平面地图中。此放置应用于源表中的未在 exclude-columns 属性列表中列出的任何列。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Kinesis 数据流的架构迁移Test到 Kinesis 数据流,需要创建规则将数据映射到目标流。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

下图说明了 Kinesis 数据流中生成的记录格式:

  • StreamName: XXX

  • PartitionKey: Test.customers//schmaname.tableNam

  • 数据://The following JSON message

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

但是,假设您使用相同的规则,但将rule-action参数更改为map-record-to-document并排除某些列。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

在这种情况下,exclude-columns参数、FirstNameStoreIdDateOfBirth中未列出的列将映射到_docLastName以下说明了生成的记录格式。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kinesis 数据流时,您可以重构数据。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

Kinesis Data Streams 的消息格式

JSON 输出只是键值对的列表。JSON_UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。

Amazon DMS提供以下保留字段,以便更轻松地使用 Kinesis Data Streams 中的数据:

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

运算

对于数据记录,操作可以是 loadinsertupdatedelete

对于控制记录,操作可以是create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-column、或column-type-change

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

时间戳

JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。