

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon Kinesis Data Streams 作为目标 Amazon Database Migration Service
<a name="CHAP_Target.Kinesis"></a>

您可以使用将数据迁移 Amazon DMS 到 Amazon Kinesis 数据流。Amazon Kinesis 数据流是 Amazon Kinesis Data Streams 服务的一部分。可以使用 Kinesis 数据流实时收集和处理大型数据记录流。

Kinesis 数据流由分片组成。*分片*是流中数据记录的唯一标识序列。有关 Amazon Kinesis Data Streams 中的分片的更多信息，请参阅《Amazon Kinesis Data Streams 开发人员指南》**中的[分片](https://docs.amazonaws.cn/streams/latest/dev/key-concepts.html#shard)。

Amazon Database Migration Service 使用 JSON 将记录发布到 Kinesis 数据流。在转换期间， Amazon DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对或 JSON\$1UNFORMATTED 消息格式。JSON\$1UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。它允许 Amazon Data Firehose 将 Kinesis 数据传递到 Amazon S3 目标，然后使用各种查询引擎（包括 Amazon Athena）进行查询。

您将使用对象映射将数据从支持的数据源迁移到目标流。使用对象映射，您确定如何在流中建立数据记录结构。您还可以为每个表定义分区键，Kinesis Data Streams 用它来将数据分组为分片。

Amazon DMS 还会设置多个 Kinesis Data Streams 参数值。创建表的成本取决于要迁移的数据量和表数。

**注意**  
 Amazon DMS 控制台或 API 上**的 SSL 模式**选项不适用于某些数据流和 NoSQL 服务，例如 Kinesis 和 DynamoDB。默认情况下，它们是安全的，因此 Amazon DMS 显示 SSL 模式设置等于无（**SSL 模式=无**）。您无需为端点提供任何其他配置即可使用 SSL。例如，使用 Kinesis 作为目标端点时，默认情况下它是安全的。所有对 Kinesis 的 API 调用都使用 SSL，因此无需在终端节点中添加额外的 SSL 选项。 Amazon DMS 您可以使用 HTTPS 协议安全地存放数据和通过 SSL 端点检索数据，Kinesis 数据流在连接到 Amazon DMS 时默认使用该协议。

**Kinesis Data Streams 端点设置**

当你使用 Kinesis Data Streams 目标端点时，你可以使用 `KinesisSettings` API 中的 Amazon DMS 选项获取交易和控制细节。

您可以使用以下方法之一设置连接：
+ 在 Amazon DMS 控制台中，使用端点设置。
+ 在 CLI 中，使用[CreateEndpoint](https://docs.amazonaws.cn/dms/latest/APIReference/API_CreateEndpoint.html)命令的`kinesis-settings`选项。

在 CLI 中，使用 `kinesis-settings` 选项的下列请求参数：
**注意**  
 Amazon DMS 版本 3.4.1 及更高版本支持 `IncludeNullAndEmpty` 端点设置。但是，中支持Kinesis Data Streams目标的以下其他端点设置。 Amazon DMS
+ `MessageFormat` – 在端点上创建的记录的输出格式。消息格式为 `JSON`（默认值）或 `JSON_UNFORMATTED`（单行，无制表符）。
+ `IncludeControlDetails` – 显示 Kinesis 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认值为 `false`。
+ `IncludeNullAndEmpty` – 在目标中包括空列。默认值为 `false`。
+ `IncludePartitionValue` – 在 Kinesis 消息输出中显示分区值，除非分区类型为 `schema-table-type`。默认值为 `false`。
+ `IncludeTableAlterOperations` – 包括更改控制数据中表的任何数据定义语言（DDL）操作，例如 `rename-table`、`drop-table`、`add-column`、`drop-column` 和 `rename-column`。默认值为 `false`。
+ `IncludeTransactionDetails` – 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 `transaction_id`、`previous_transaction_id` 和 `transaction_record_id `（事务内的记录偏移）的值。默认值为 `false`。
+ `PartitionIncludeSchemaTable` – 当分区类型为 `primary-key-type` 时，将架构和表名称作为分区值的前缀。这样做会增加 Kinesis 分片之间的数据分布。例如，假设 `SysBench` 架构具有数千个表，并且每个表的主键只有有限的范围。在此况下，同一主键将从数千个表发送到同一个分片，这会导致限制。默认值为 `false`。
+ `UseLargeIntegerValue`— 使用最多 18 位的 int，而不是将整数转换为双精度，从 3.5.4 Amazon DMS 版本中可用。默认值为 false。

以下示例显示在使用 Amazon CLI发出的示例 `create-endpoint` 命令中使用的 `kinesis-settings` 选项。

```
aws dms \
  create-endpoint \
    --region <aws-region> \
    --endpoint-identifier <user-endpoint-identifier> \
    --endpoint-type target \
    --engine-name kinesis \
    --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::<account-id>:role/<kinesis-role-name>,StreamArn=arn:aws:kinesis:<aws-region>:<account-id>:stream/<stream-name>,MessageFormat=json-unformatted,
IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true,
IncludeTableAlterOperations=true
```

**多线程完全加载任务设置**

为了帮助提高传输速度， Amazon DMS 支持对 Kinesis Data Streams 目标实例进行多线程满载。对于包含下列内容的任务设置，DMS 支持此多线程处理：
+ `MaxFullLoadSubTasks` – 使用此选项指示要并行加载的表的最大数目。DMS 使用专用子任务将各个表加载到其对应的 Kinesis 目标表。默认值为 8；最大值为 49。
+ `ParallelLoadThreads`— 使用此选项指定用于将每个表加载到其 Kinesis 目标表的线程数。 Amazon DMS Kinesis Data Streams 目标的最大值为 32。您可以请求提高此最大值限制。
+ `ParallelLoadBufferSize` – 使用此选项指定在缓冲区（并行加载线程将数据加载到 Kinesis 目标时使用）中存储的最大记录数。默认值是 50。最大值为 1000。将此设置与 `ParallelLoadThreads` 一起使用；仅在有多个线程时 `ParallelLoadBufferSize` 才有效。
+ `ParallelLoadQueuesPerThread` – 使用此选项可以指定每个并发线程访问的队列数，以便从队列中取出数据记录并为目标生成批处理负载。默认值是 1。但是，对于各种负载大小的 Kinesis 目标，有效范围为每个线程 5-512 个队列。

**多线程 CDC 加载任务设置**

您可以提高实时数据流目标端点的更改数据捕捉（CDC）的性能，例如，Kinesis 使用任务设置来修改 `PutRecords` API 调用的行为。为此，您可以使用 `ParallelApply*` 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如，假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列，每个缓冲区存储 50 条记录。

为了提高 CDC 性能， Amazon DMS 支持以下任务设置：
+ `ParallelApplyThreads`— 指定在 CDC 加载期间 Amazon DMS 用于将数据记录推送到 Kinesis 目标端点的并发线程数。默认值为零（0），最大值为 32。
+ `ParallelApplyBufferSize` – 指定在 CDC 加载过程中，要在每个缓冲区队列中存储的、供并发线程推送到目标 Kinesis 端点的最大记录数。默认值是 100，最大值是 1,000。当 `ParallelApplyThreads` 指定多个线程时，请使用此选项。
+ `ParallelApplyQueuesPerThread` – 指定每个线程访问以将数据记录从队列中取出并在 CDC 期间为 Kinesis 端点生成批处理负载的队列数。默认值是 1，最大值是 512。

使用 `ParallelApply*` 任务设置时，`partition-key-type` 默认值是表的 `primary-key`，而不是 `schema-name.table-name`。

## 使用之前映像查看 Kinesis 数据流（作为目标）的 CDC 行的原始值
<a name="CHAP_Target.Kinesis.BeforeImage"></a>

在将 CDC 更新写入数据流目标（如 Kinesis）时，可以在更新进行更改之前查看源数据库行的原始值。为此，请根据源数据库引擎提供的数据 Amazon DMS 填充更新事件*之前的图像*。

不同的源数据库引擎为之前映像提供不同的信息量：
+ 仅当列发生更改时，Oracle 才会对列提供更新。
+ PostgreSQL 仅为作为主键一部分的列（已更改或未更改）提供数据。要为所有列提供数据（无论是否更改），您需要将 `REPLICA_IDENTITY` 设置为 `FULL` 而不是 `DEFAULT`。请注意，您应该仔细选择每张表的 `REPLICA_IDENTITY` 设置。如果将 `REPLICA_IDENTITY` 设置为 `FULL`，则所有列值都将连续写入预写日志（WAL）。这可能会导致经常更新的表出现性能或资源问题。
+ MySQL 通常为 BLOB 和 CLOB 数据类型以外的所有列（已更改或未更改）提供数据。

要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出，请使用 `BeforeImageSettings` 任务设置或 `add-before-image-columns` 参数。此参数应用列转换规则。

`BeforeImageSettings` 使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性，如下所示。

```
"BeforeImageSettings": {
    "EnableBeforeImage": boolean,
    "FieldName": string,  
    "ColumnFilter": pk-only (default) / non-lob / all (but only one)
}
```

**注意**  
仅`BeforeImageSettings`适用于包含 CDC 组件的 Amazon DMS 任务，例如满负荷加上 CDC 任务（迁移现有数据并复制正在进行的更改），或仅适用于 CDC 的任务（仅复制数据更改）。不将 `BeforeImageSettings` 应用于仅完全加载的任务。

对于 `BeforeImageSettings` 选项，以下条件适用：
+ 将 `EnableBeforeImage` 选项设置为 `true` 以启用之前映像。默认值为 `false`。
+ 使用 `FieldName` 选项为新 JSON 属性指定名称。当 `EnableBeforeImage` 为 `true` 时，`FieldName` 是必填项且不能为空。
+ `ColumnFilter` 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列，请使用默认值 `pk-only`。要添加具有之前映像值的任何列，请使用 `all`。请注意，之前映像不包括含有 LOB 数据类型（如 CLOB 或 BLOB）的列。

  ```
  "BeforeImageSettings": {
      "EnableBeforeImage": true,
      "FieldName": "before-image",
      "ColumnFilter": "pk-only"
    }
  ```

**注意**  
Amazon S3 目标不支持 `BeforeImageSettings`。对于 S3 目标，在 CDC 期间仅使用 `add-before-image-columns` 转换规则来执行之前映像。

### 使用之前映像转换规则
<a name="CHAP_Target.Kinesis.BeforeImage.Transform-Rule"></a>

作为任务设置的替代方法，您可以使用 `add-before-image-columns` 参数，该参数应用列转换规则。使用此参数，您可以在 CDC 期间对数据流目标（如 Kinesis）启用之前映像。

通过在转换规则中使用 `add-before-image-columns`，可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器，该定位器允许您控制为规则选择的表。此外，您可以将转换规则链接在一起，这样可以将不同的规则应用于不同的表。然后，您可以操控使用其他规则生成的列。

**注意**  
不要在同一任务中将 `add-before-image-columns` 参数与 `BeforeImageSettings` 任务设置结合使用。而是对单个任务使用此参数或此设置，但不要同时使用这两者。

包含列的 `add-before-image-columns` 参数的 `transformation` 规则类型必须提供一个 `before-image-def` 部分。下面是一个示例。

```
    {
      "rule-type": "transformation",
      …
      "rule-target": "column",
      "rule-action": "add-before-image-columns",
      "before-image-def":{
        "column-filter": one-of  (pk-only / non-lob / all),
        "column-prefix": string,
        "column-suffix": string,
      }
    }
```

`column-prefix` 的值附加到列名称前面，`column-prefix` 的默认值为 `BI_`。`column-suffix` 的值将附加到列名称之后，默认值为空。不要同时将 `column-prefix` 和 `column-suffix` 设置为空字符串。

为 `column-filter` 选择一个值。要仅添加属于表主键一部分的列，请选择 `pk-only`。选择 `non-lob` 以仅添加不属于 LOB 类型的列。或者，选择 `all` 以添加任何具有之前映像值的列。

### 之前映像转换规则的示例
<a name="CHAP_Target.Kinesis.BeforeImage.Example"></a>

以下示例中的转换规则在目标中添加一个名为 `BI_emp_no` 的新列。因此，像 `UPDATE employees SET emp_no = 3 WHERE emp_no = 1;` 这样的语句用 1 填充 `BI_emp_no` 字段。当您将 CDC 更新写入 Amazon S3 目标时，通过 `BI_emp_no` 列能够判断哪个原始行已更新。

```
{
  "rules": [
    {
      "rule-type": "selection",
      "rule-id": "1",
      "rule-name": "1",
      "object-locator": {
        "schema-name": "%",
        "table-name": "%"
      },
      "rule-action": "include"
    },
    {
      "rule-type": "transformation",
      "rule-id": "2",
      "rule-name": "2",
      "rule-target": "column",
      "object-locator": {
        "schema-name": "%",
        "table-name": "employees"
      },
      "rule-action": "add-before-image-columns",
      "before-image-def": {
        "column-prefix": "BI_",
        "column-suffix": "",
        "column-filter": "pk-only"
      }
    }
  ]
}
```

有关使用 `add-before-image-columns` 规则操作的信息，请参阅 [转换规则和操作](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)。

## 使用 Kinesis 数据流作为目标的先决条件 Amazon Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites"></a>

### 用于使用 Kinesis 数据流作为目标的 IAM 角色 Amazon Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites.IAM"></a>

在将 Kinesis 数据流设置为目标之前 Amazon DMS，请务必创建一个 IAM 角色。此角色必须允许 Amazon DMS 代入并授予对正在迁移到的 Kinesis 数据流的访问权限。以下 IAM policy 中显示了所需的最小访问权限集合。

------
#### [ JSON ]

****  

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement": [
   {
     "Sid": "1",
     "Effect": "Allow",
     "Principal": {
        "Service": "dms.amazonaws.com"
     },
   "Action": "sts:AssumeRole"
   }
]
}
```

------

您在迁移到 Kinesis 数据流时使用的角色必须具有以下权限。

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "*"
    }
  ]
}
```

------

### 访问 Kinesis 数据流作为目标 Amazon Database Migration Service
<a name="CHAP_Target.Kinesis.Prerequisites.Access"></a>

在 3.4.7 及更高 Amazon DMS 版本中，要连接到 Kinesis 终端节点，必须执行以下操作之一：
+ 将 DMS 配置为使用 VPC 端点。有关将 DMS 配置为使用 VPC 端点的信息，请参阅[配置 VPC 终端节点 Amazon DMS](CHAP_VPC_Endpoints.md)。
+ 将 DMS 配置为使用公有路由，即，公开您的复制实例。有关公有复制实例的信息，请参阅[公有和私有复制实例](CHAP_ReplicationInstance.PublicPrivate.md)。

## 使用 Kinesis Data Streams 作为目标时的限制 Amazon Database Migration Service
<a name="CHAP_Target.Kinesis.Limitations"></a>

将 Kinesis Data Streams 作为目标时存在以下限制：
+ Amazon DMS 无论事务如何，都会将每次更新作为给定 Kinesis 数据流中的一条数据记录发布到源数据库中的一条记录。但是，您可以使用 `KinesisSettings` API 的相关参数包含每条数据记录的事务详细信息。
+ 不支持完整 LOB 模式。
+ 支持的最大 LOB 大小为 1 MB。
+ Kinesis Data Streams 不支持重复数据删除。使用流中数据的应用程序需要处理重复记录。有关更多信息，请参阅《Amazon Kinesis Data Streams 开发人员指南》**中的[处理重复记录](https://docs.amazonaws.cn/streams/latest/dev/kinesis-record-processor-duplicates.html)。
+ Amazon DMS 支持以下四种形式的分区键：
  + `SchemaName.TableName`：架构和表名称的组合。
  + `${AttributeName}`：JSON 中其中一个字段的值，或源数据库中表的主键。
  + `transaction-id`: CDC 交易编号。同一事务中的所有记录都进入同一个分区。
  + `constant`：每条记录的固定字面值，无论表或数据如何。所有记录都发送到相同的分区键值 “常量”，从而为所有表提供严格的全局排序。

  ```
  {
      "rule-type": "object-mapping",
      "rule-id": "2",
      "rule-name": "PartitionKeyTypeExample",
      "rule-action": "map-record-to-document",
      "object-locator": {
          "schema-name": "onprem",
          "table-name": "it_system"
      },
      "mapping-parameters": {
          "partition-key-type": "transaction-id | constant | attribute-name | schema-table"
      }
  }
  ```
+ 有关在 Kinesis Data Streams 中加密静态数据的信息，请参阅《Amazon Key Management Service 开发人员指南》**中的 [Kinesis Data Streams 中的数据保护](https://docs.amazonaws.cn/streams/latest/dev/server-side-encryption.html.html)。
+ 只有当源`IncludeTransactionDetails`端点是 Oracle、SQL Server、PostgreSQL 或 MySQL 时，才支持端点设置。对于其他源端点类型，交易详细信息将不包括在内。
+ Kinesis 端点不支持 `BatchApply`。对 Kinesis 目标使用“批应用”（例如，`BatchApplyEnabled` 目标元数据任务设置）可能会导致任务失败和数据丢失。使用 Kinesis 作为目标端点时，请勿启用 `BatchApply`。
+ 只有与复制实例相同 Amazon 账户的 Kinesis 数据流才支持 Kinesi Amazon Web Services 区域 s 目标。
+ 从 MySQL 源迁移时， BeforeImage 数据不包括 CLOB 和 BLOB 数据类型。有关更多信息，请参阅 [使用之前映像查看 Kinesis 数据流（作为目标）的 CDC 行的原始值](#CHAP_Target.Kinesis.BeforeImage)。
+ Amazon DMS 不支持迁移超过 16 位`BigInt`数据类型的值。要解决此限制问题，您可以使用以下转换规则将 `BigInt` 列转换为字符串。有关转换规则的更多信息，请参阅[转换规则和操作](CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Transformations.md)。

  ```
  {
      "rule-type": "transformation",
      "rule-id": "id",
      "rule-name": "name",
      "rule-target": "column",
      "object-locator": {
          "schema-name": "valid object-mapping rule action",
          "table-name": "",
          "column-name": ""
      },
      "rule-action": "change-data-type",
      "data-type": {
          "type": "string",
          "length": 20
      }
  }
  ```
+ 当单个事务中的多个 DML 操作修改源数据库上的大型对象（LOB）列时，目标数据库仅保留该事务中最后一次操作的最终 LOB 值。先前操作在同一事务中设置的中间 LOB 值会被覆盖，这可能会导致潜在的数据丢失或不一致。出现这种情况是复制期间 LOB 数据的处理方式使然。
+ Amazon DMS 使用 Kinesis 作为目标端点时，不支持包含嵌入`'\0'`字符的源数据。包含嵌入`'\0'`字符的数据将在第一个`'\0'`字符处被截断。

## 使用对象映射将数据迁移到 Kinesis 数据流
<a name="CHAP_Target.Kinesis.ObjectMapping"></a>

Amazon DMS 使用表映射规则将数据从源映射到目标 Kinesis 数据流。要将数据映射到目标流，您必须使用称为 object mapping 的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kinesis 数据流的数据记录。

除了具有分区键以外，Kinesis 数据流没有预设结构。在对象映射规则中，数据记录的 `partition-key-type` 的可能值为 `schema-table`、`transaction-id`、`primary-key`、`constant` 和 `attribute-name`。

要创建对象映射规则，您应将 `rule-type` 指定为 `object-mapping`。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

```
{
    "rules": [
        {
            "rule-type": "object-mapping",
            "rule-id": "id",
            "rule-name": "name",
            "rule-action": "valid object-mapping rule action",
            "object-locator": {
                "schema-name": "case-sensitive schema name",
                "table-name": ""
            }
        }
    ]
}
```

Amazon DMS 目前支持将`map-record-to-record`和`map-record-to-document`作为该`rule-action`参数的唯一有效值。这些设置会影响未作为 `exclude-columns` 属性列表一部分排除的值。`map-record-to-record`和`map-record-to-document`值指定默认情况下如何 Amazon DMS 处理这些记录。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kinesis 数据流时使用 `map-record-to-record`。此规则类型使用关系数据库的 `taskResourceId.schemaName.tableName` 值作为 Kinesis 数据流中的分区键，并为源数据库中的每个列创建一个属性。

使用 `map-record-to-record` 时请注意：
+ 此设置仅影响 `exclude-columns` 列表排除的列。
+ 对于每个这样的列，在目标主题中 Amazon DMS 创建一个相应的属性。
+ Amazon DMS 无论源列是否用于属性映射，都会创建相应的属性。

通过使用 `map-record-to-document`，可使用属性名“\$1doc”将源列放入相应目标流中的单个平面文档中。 Amazon DMS 将数据放入源上名为“`_doc`”的单个平面映射中。此放置应用于源表中的未在 `exclude-columns` 属性列表中列出的任何列。

了解 `map-record-to-record` 的一种方法是在操作时加以观察。对于本示例，假定您使用关系数据库表行开始处理，该行具有以下结构和数据。


| FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth | 
| --- | --- | --- | --- | --- | --- | --- | --- | 
| Randy | Marsh | 5 | 221B Baker Street | 1234567890 | 31 Spooner Street, Quahog  | 9876543210 | 02/29/1988 | 

要将此信息从名为 `Test` 的架构迁移到 Kinesis 数据流，您将创建规则来将数据映射到目标流。以下规则对此映射进行了说明。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "DefaultMapToKinesis",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            }
        }
    ]
}
```

以下内容说明 Kinesis 数据流中生成的记录格式。
+ StreamName: XXX
+ PartitionKey: test.Customers //schmaname.tableNam
+ 数据：//The following JSON message

  ```
    {
       "FirstName": "Randy",
       "LastName": "Marsh",
       "StoreId":  "5",
       "HomeAddress": "221B Baker Street",
       "HomePhone": "1234567890",
       "WorkAddress": "31 Spooner Street, Quahog",
       "WorkPhone": "9876543210",
       "DateOfBirth": "02/29/1988"
    }
  ```

但是，假设您使用相同的规则，但将 `rule-action` 参数更改为 `map-record-to-document` 并排除某些列。以下规则对此映射进行了说明。

```
{
	"rules": [
	   {
			"rule-type": "selection",
			"rule-id": "1",
			"rule-name": "1",
			"rule-action": "include",
			"object-locator": {
				"schema-name": "Test",
				"table-name": "%"
			}
		},
		{
			"rule-type": "object-mapping",
			"rule-id": "2",
			"rule-name": "DefaultMapToKinesis",
			"rule-action": "map-record-to-document",
			"object-locator": {
				"schema-name": "Test",
				"table-name": "Customers"
			},
			"mapping-parameters": {
				"exclude-columns": [
					"homeaddress",
					"homephone",
					"workaddress",
					"workphone"
				]
			}
		}
	]
}
```

在这种情况下，`exclude-columns` 参数中未列出的列 `FirstName`、`LastName`、`StoreId` 和 `DateOfBirth` 将映射到 `_doc`。以下内容说明生成的记录格式。

```
       {
            "data":{
                "_doc":{
                    "FirstName": "Randy",
                    "LastName": "Marsh",
                    "StoreId":  "5",
                    "DateOfBirth": "02/29/1988"
                }
            }
        }
```

### 使用属性映射调整数据结构
<a name="CHAP_Target.Kinesis.AttributeMapping"></a>

在使用属性映射将数据迁移到 Kinesis 数据流时，您可以调整数据结构。例如，您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "rule-action": "include",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            }
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKinesis",
            "rule-action": "map-record-to-record",
            "target-table-name": "CustomerData",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customers"
            },
            "mapping-parameters": {
                "partition-key-type": "attribute-name",
                "partition-key-name": "CustomerName",
                "exclude-columns": [
                    "firstname",
                    "lastname",
                    "homeaddress",
                    "homephone",
                    "workaddress",
                    "workphone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${lastname}, ${firstname}"
                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "document",
                        "attribute-sub-type": "json",
                        "value": {
                            "Home": {
                                "Address": "${homeaddress}",
                                "Phone": "${homephone}"
                            },
                            "Work": {
                                "Address": "${workaddress}",
                                "Phone": "${workphone}"
                            }
                        }
                    }
                ]
            }
        }
    ]
}
```

要为设置常量值`partition-key`，请指定`"partition-key-type: "constant"`，这会将分区值设置为`constant`。例如，您可以执行此操作来强制将所有数据存储在一个分片内。以下映射说明了此方法。

```
{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "%"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "TransformToKinesis",
            "rule-action": "map-record-to-document",
            "object-locator": {
                "schema-name": "Test",
                "table-name": "Customer"
            },
            "mapping-parameters": {
                "partition-key-type": "constant",
                "exclude-columns": [
                    "FirstName",
                    "LastName",
                    "HomeAddress",
                    "HomePhone",
                    "WorkAddress",
                    "WorkPhone"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "CustomerName",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${FirstName},${LastName}"

                    },
                    {
                        "target-attribute-name": "ContactDetails",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": {
                            "Home": {
                                "Address": "${HomeAddress}",
                                "Phone": "${HomePhone}"
                            },
                            "Work": {
                                "Address": "${WorkAddress}",
                                "Phone": "${WorkPhone}"
                            }
                        }
                    },
                    {
                        "target-attribute-name": "DateOfBirth",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${DateOfBirth}"
                    }
                ]
            }
        }
    ]
}
```

**注意**  
表示特定表的控制记录的 `partition-key` 值为 `TaskId.SchemaName.TableName`。表示特定任务的控制记录的 `partition-key` 值为该记录的 `TaskId`。在对象映射中指定 `partition-key` 值不会影响控制记录的 `partition-key`。  
 如果在表映射规则`attribute-name`中设置`partition-key-type`为，则必须指定`partition-key-name`，该规则必须引用源表中的列或映射中定义的自定义列。此外，`attribute-mappings`必须提供源列以定义源列如何映射到目标 Kinesis Stream。

### Kinesis Data Streams 的消息格式
<a name="CHAP_Target.Kinesis.Messageformat"></a>

JSON 输出只是键值对的列表。JSON\$1UNFORMATTED 消息格式是带有换行符的单行 JSON 字符串。

Amazon DMS 提供了以下保留字段，以便更轻松地使用 Kinesis Data Streams 中的数据：

**RecordType**  
记录类型可以是数据或控制。*数据记录*表示源中的实际行。*控制记录*表示流中的重要事件，例如，重新开始任务。

**操作**  
对于数据记录，操作可以是 `load`、`insert`、`update` 或 `delete`。  
对于控制记录，操作可以是 `create-table`、`rename-table`、`drop-table`、`change-columns`、`add-column`、`drop-column`、`rename-column` 或 `column-type-change`。

**SchemaName**  
记录的源架构。此字段对于控制记录可能是空的。

**TableName**  
记录的源表。此字段对于控制记录可能是空的。

**Timestamp**  
JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。