将 Apache Kafka 作为 Amazon Database Migration Service 的目标 - Amazon Database Migration Service
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Apache Kafka 作为 Amazon Database Migration Service 的目标

您可以使用 Amazon DMS 将数据迁移到 Apache Kafka 集群。Apache Kafka 是一个分布式流处理平台。您可以使用 Apache Kafka 实时提取和处理流数据。

Amazon还提供 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 作为Amazon DMS目标。Amazon MSK 是一项完全托管的 Apache Kafka 流式处理服务,可简化 Apache Kafka 实例的实施和管理。它与开源 Apache Kafka 版本配合使用,并且您将 Amazon MSK 实例作为Amazon DMS目标与任何 Apache Kafka 实例完全相同。有关更多信息,请参阅 。什么是 Amazon MSK?中的Amazon Managed Streaming for Apache Kafka 开发人员指南。

Kafka 集群将记录流存储在称为主题的类别中,这些类别分为分区。分区是主题中唯一标识的数据记录(消息)序列。分区可分布在集群中的多个代理之间,以支持并行处理主题记录。有关 Apache Kafka 中主题和分区及其分布的更多信息,请参阅主题和日志分布

Amazon Database Migration Service使用 JSON 将记录发布到 Kafka 主题。转换期间,Amazon DMS 将每个记录从源数据库序列化到 JSON 格式的属性/值对。

要将数据从支持的数据源迁移到目标 Kafka 集群,您应使用对象映射。使用对象映射,您确定如何在目标主题中建立数据记录结构。您还可以为每个表定义分区键,Apache Kafka 使用该键将数据分组为分区。

当 Amazon DMS 在 Apache Kafka 目标终端节点上创建表时,它会创建与源数据库终端节点相同数量的表。Amazon DMS 还会设置几个 Apache Kafka 参数值。创建表的成本取决于要迁移的数据量和表数。

Apache Kafka 终端节点设置

您可以通过Amazon DMS控制台或--kafka-settings选项。每个设置的要求如下:

  • Broker— 以逗号分隔列表的形式指定 Kafka 集群中一个或多个代理的位置。broker-hostname:port. 示例是 "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"。此设置可指定集群中的任何或所有代理的位置。集群代理全部通过通信来处理迁移到主题的数据记录的分区。

  • Topic— (可选) 指定主题名称,最大长度为 255 个字母和符号。您可以使用句点 (.)、下划线 (_) 和减号 (-)。带句点 (.) 或下划线 (_) 的主题名称可能会在内部数据结构中发生冲突。在主题名称中使用其中一个符号,但不要同时使用这两个符号。如果不指定主题名称,请使用Amazon DMS使用"kafka-default-topic"作为迁移主题。

    注意

    要让 Amazon DMS 创建您指定的迁移主题或默认主题,请将 auto.create.topics.enable = true 设置为 Kafka 集群配置的一部分。有关更多信息,请参阅 。将 Apache Kafka 作为 Amazon Database Migration Service 目标的限制

  • MessageFormat— 在终端节点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • MessageMaxBytes— 终端节点上创建的记录的最大大小(以字节为单位)。默认值为 1,000,000。

    注意

    您只能使用Amazon要更改的 CLI/SDKMessageMaxBytes设置为非默认值。例如,要修改现有 Kafka 终端节点并将MessageMaxBytes,请使用以下命令。

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails— 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id(事务内的记录偏移)的值。默认为 false

  • IncludePartitionValue— 显示 Kafka 消息输出中的分区值,除非分区类型为schema-table-type. 默认为 false

  • PartitionIncludeSchemaTable— 当分区类型为时,将架构和表名称作为分区值的前缀。primary-key-type. 这样做会提高数据在 Kafka 分区之间的分布广度。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在这种情况下,同一主键将从数千个表发送到同一个分区,这会导致限制。默认为 false

  • IncludeTableAlterOperations— 包括更改控制数据中表的任何数据定义语言 (DDL) 操作,例如rename-tabledrop-tableadd-columndrop-column, 和rename-column. 默认为 false

  • IncludeControlDetails— 显示 Kafka 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认为 false

  • IncludeNullAndEmpty— 在目标中包括 NULL 列和空列。默认为 false

  • SecurityProtocol— 使用传输层安全性 (TLS) 设置到 Kafka 目标终端节点的安全连接。选项包括 ssl-authenticationssl-encryptionsasl-ssl。使用sasl-ssl需要SaslUsernameSaslPassword.

您可以使用设置来帮助提高传输速度。为此,Amazon DMS支持对 Apache Kafka 目标集群实施多线程完全加载。Amazon DMS支持此多线程处理,并使用包含下列内容的任务设置:

  • MaxFullLoadSubTasks— 使用此选项指示要并行加载的表的最大数目。Amazon DMS使用专用的子任务将各个表加载到其对应的 Kafka 目标表。默认值为 8;最大值为 49。

  • ParallelLoadThreads— 使用此选项指定将Amazon DMS用于将每个表加载到其 Kafka 目标表。Apache Kafka 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize— 使用此选项指定在缓冲区(并行加载线程将数据加载到 Kafka 目标时使用)中存储的最大记录数。默认值是 50。最大值为 1,000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread— 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值为 1。但是,对于各种负载大小的 Kafka 目标,有效范围为每个线程 5—512 个队列。

您可以提高实时数据流目标终端节点的更改数据捕获 (CDC) 的性能,例如,Kafka 使用任务设置来修改PutRecordsAPI 调用。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行 CDC 加载并且要并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

要提高 CDC 性能,请 Amazon DMS 支持以下任务设置:

  • ParallelApplyThreads— 指定并行线程的数量Amazon DMS在 CDC 加载期间使用的情况下将数据记录推送到 Kafka 目标终端节点。默认值为零 (0),最大值为 32。

  • ParallelApplyBufferSize— 指定在 CDC 加载期间要在每个缓冲区队列中存储的供并发线程推送到 Kafka 目标终端节点的最大记录数。默认值为 100,最大值为 1,000。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread— 指定每个线程访问的队列数,以便从队列中取出数据记录并在 CDC 期间为 Kafka 终端节点生成批处理负载。

使用 ParallelApply* 任务设置时, partition-key-type 默认值是表的 primary-key,而不是 schema-name.table-name

使用传输层安全性 (TLS) 连接至 Kafka

Kafka 集群使用传输层安全性 (TLS) 接受安全连接。使用 DMS,您可以使用以下三种安全协议选项之一来保护 Kafka 端点连接。

SSL 加密 (server-encryption

客户端通过服务器的证书验证服务器身份。然后在服务器和客户端之间建立加密连接。

SSL 身份验证 (mutual-authentication

服务器和客户端通过各自的证书相互验证身份。然后在服务器和客户端之间建立加密连接。

SASL-SSL (mutual-authentication

简单身份验证和安全层 (SASL) 方法用用用户名和密码替换客户端的证书,以验证客户端身份。具体而言,您可以提供服务器已注册的用户名和密码,以便服务器可以验证客户端的身份。然后在服务器和客户端之间建立加密连接。

重要

阿帕奇卡夫卡和亚马逊 MSK 接受已解析的证书。这是 Kafka 和亚马逊 MSK 的已知限制。有关更多信息,请参阅 。阿帕奇·卡夫卡问题,KAFKA-3700.

如果您使用的是 Amazon MSK,请考虑使用访问控制列表 (ACL) 作为此已知限制的解决方法。有关使用 ACL 的更多信息,请参阅。Apache Kafka ACL的部分Amazon Managed Streaming for Apache Kafka 开发人员指南.

如果您使用的是自行管理的 Kafka 集群,请参阅日期为 10 月 21 日的评论了解有关配置集群的信息。

将 SSL 加密与 Amazon MSK 或自管理的 Kafka 集群结合使用

您可以使用 SSL 加密来保护与 Amazon MSK 或自行管理的 Kafka 集群的终端节点连接。使用 SSL 加密身份验证方法时,客户端通过服务器的证书验证服务器的身份。然后在服务器和客户端之间建立加密连接。

使用 SSL 加密连接到亚马逊 MSK

  • 设置安全协议终端节点设置 (SecurityProtocol)使用ssl-encryption选项,当您创建目标 Kafka 端点时。

    以下 JSON 示例将安全协议设置为 SSL 加密。

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }

对自行管理的 Kafka 集群使用 SSL 加密

  1. 如果您在本地 Kafka 集群中使用私有证书颁发机构 (CA),请上传您的私有 CA 证书并获取 Amazon 资源名称 (ARN)。

  2. 设置安全协议终端节点设置 (SecurityProtocol)使用ssl-encryption选项,当您创建目标 Kafka 端点时。下面的 JSON 示例将安全协议设置为ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 如果您使用的是私有 CA,请将SslCaCertificateArn在上面的第一步中获得的 ARN。

使用 SSL 身份验证

您可以使用 SSL 身份验证来保护终端节点连接到 Amazon MSK 或自行管理的 Kafka 集群的安全性。

要使用 SSL 身份验证启用客户端身份验证和加密以连接到 Amazon MSK,请执行以下操作:

  • 准备 Kafka 的私钥和公有证书。

  • 将证书上载到 DMS 证书管理器。

  • 使用 Kafka 终端节点设置中指定的相应证书 ARN 创建 Kafka 目标终端节点。

为 Amazon MSK 准备私钥和公有证书

  1. 创建 EC2 实例并将客户端设置为使用身份验证,请参阅客户端身份验证的部分Amazon Managed Streaming for Apache Kafka 开发人员指南.

    完成这些步骤后,您将有一个证书-ARN(ACM 中保存的公共证书 ARN)和一个私钥包含在kafka.client.keystore.jks文件。

  2. 获取公共证书并将证书复制到signed-certificate-from-acm.pem文件,使用以下命令:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    该命令返回类似以下示例的信息:

    {"Certificate": "123", "CertificateChain": "456"}

    然后,你复制你的等价"123"添加到signed-certificate-from-acm.pem文件。

  3. 通过导入msk-rsa密钥来自kafka.client.keystore.jks to keystore.p12,如以下示例所示。

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 使用以下命令导出keystore.p12进入.pem格式的日期和时间。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    这些区域有:输入 PEM 密码短语消息,并标识应用于加密证书的密钥。

  5. 删除包属性和关键属性从.pem文件以确保第一行以下字符串开头。

    ---BEGIN ENCRYPTED PRIVATE KEY---

将公有证书和私钥上传到 DMS 证书管理器并测试与 Amazon MSK 的连接

  1. 使用以下命令将上载到 DMS 证书管理器。

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. 创建 Amazon MSK 目标终端节点并测试连接以确保 TLS 身份验证工作正常。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
重要

您可以使用 SSL 身份验证来保护与自行管理的 Kafka 集群的连接。在某些情况下,您可能会在本地 Kafka 群集中使用专用证书颁发机构 (CA)。如果是这样,请将您的 CA 链、公有证书和私钥上传到 DMS 证书管理器。然后,在创建本地 Kafka 目标终端节点时,在终端节点设置中使用相应的 Amazon 资源名称 (ARN)。

为自行管理的 Kafka 集群准备私有密钥和签名证书

  1. 生成 key pair,如下例所示。

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 生成证书签名请求 (CSR)。

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 使用群集信任库中的 CA 对 CSR 进行签名。如果您没有 CA,则可以创建自己的私有 CA。

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. 导入ca-cert添加到服务器信任库和密钥库中。如果您还没有信任库,请使用以下命令创建信任库并导入ca-cert 进入它。

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 对证书进行签名。

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 将签名证书导入到密钥库。

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 使用以下命令导入on-premise-rsa密钥来自kafka.server.keystore.jkskeystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 使用以下命令导出keystore.p12进入.pem格式的日期和时间。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. 上传encrypted-private-server-key.pemsigned-certificate.pem, 和ca-cert添加到 DMS 证书管理器。

  10. 使用返回的 ARN 创建终端节点。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

使用 SASL-SSL 身份验证连接到亚马逊 MSK

简单身份验证和安全层 (SASL) 方法使用用户名和密码来验证客户端身份,并在服务器和客户端之间建立加密连接。

要使用 SASL,请在设置 Amazon MSK 集群时首先创建一个安全的用户名和密码。有关如何为 Amazon MSK 集群设置安全用户名和密码的说明,请参阅为亚马逊MSK 集群设置 SASL/SCRM 身份验证中的Amazon Managed Streaming for Apache Kafka 开发人员指南.

然后,当您创建 Kafka 目标终端节点时,设置安全协议终端节点设置 (SecurityProtocol)使用sasl-ssl选项。您还将SaslUsernameSaslPassword选项。请确保它们与您首次设置 Amazon MSK 集群时创建的安全用户名和密码一致,如以下 JSON 示例所示。

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
注意

目前,Amazon DMS仅支持公共 CA 支持的 SASL/SSL。DMS 不支持用于私有 CA 支持的自我管理卡夫卡的 SASL/SSL。

使用之前映像查看 Apache Kafka(作为目标)的 CDC 行的原始值

在将 CDC 更新写入数据流目标(如 Kafka)时,可以在更新进行更改之前查看源数据库行的原始值。为此,Amazon DMS 根据源数据库引擎提供的数据填充更新事件的之前映像

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • PostgreSQL 仅为作为主键一部分的列(已更改或未更改)提供数据。

  • MySQL 通常为所有列(已更改或未更改)提供数据。

要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings 使用从源数据库系统收集的值向每个更新操作添加一个新的 JSON 属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings 应用于完全加载任务及 CDC 任务(这会迁移现有数据并复制正在进行的更改)或仅应用于 CDC 任务(这仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认为 false

  • 使用 FieldName 选项为新 JSON 属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。要仅添加非 LOB 类型的列,请使用 non-lob。要添加具有之前映像值的任何列,请使用 all

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在 CDC 期间对数据流目标(如 Kafka)启用之前映像。

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 add-before-image-columns 参数的 transformation 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值附加到列名称前面,column-prefix 的默认值为 BI_column-suffix 的值将附加到列名称之后,默认值为空。不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。要仅添加属于表主键一部分的列,请选择 pk-only。选择 non-lob 以仅添加不属于 LOB 类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您将 CDC 更新写入 Amazon S3 目标时,BI_emp_no列使得能够判断哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作

将 Apache Kafka 作为 Amazon Database Migration Service 目标的限制

将 Apache Kafka 作为目标时存在以下限制:

  • Amazon DMS 支持 Kafka 目标的最大消息大小为 1 MiB。

  • 确保配置Amazon DMS复制实例和 Kafka 集群在同一虚拟私有云 (VPC) 中,并在同一安全组中。Kafka 集群可以是 Amazon MSK 实例,也可以是在 Amazon EC2 上运行的您自己的 Kafka 实例。有关更多信息,请参阅为复制实例设置网络

    注意

    要指定 Amazon MSK 的安全组,请在创建集群页面上,选择高级设置,选择自定义设置,然后选择安全组,或者接受默认值(如果该默认值与复制实例的相同)。

  • 不支持完整 LOB 模式。

  • 使用允许 Amazon DMS 自动创建新主题的属性为您的集群指定 Kafka 配置文件。包括设置 auto.create.topics.enable = true。如果您使用的是 Amazon MSK,可以在创建 Kafka 集群时指定默认配置,然后更改auto.create.topics.enable设置为true. 有关默认配置设置的更多信息,请参阅默认 Amazon MSK 配置中的Amazon Managed Streaming for Apache Kafka 开发人员指南. 如果您需要修改使用 Amazon MSK 创建的现有 Kafka 集群,请运行Amazon CLI命令aws kafka create-configuration更新您的 Kafka 配置,如以下示例所示:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    此处,//~/kafka_configuration 是您使用所需的属性设置创建的配置文件。

    如果您使用的是 Amazon EC2 上安装的您自己的 Kafka 实例,请使用类似的属性设置修改 Kafka 集群配置,包括auto.create.topics.enable = true,使用随您的实例提供的选项。

  • Amazon DMS将每个更新作为给定 Kafka 主题中的一条数据记录(消息)发布到源数据库中的单条记录,而不考虑事务。

  • Amazon DMS 支持以下两种分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:JSON 中其中一个字段的值,或源数据库中表的主键。

  • BatchApply不支持 Kafka 终端节点。使用 Batch 应用(例如,BatchApplyEnabled目标元数据任务设置)可能会导致数据丢失。

使用对象映射将数据迁移到 Kafka 主题

Amazon DMS使用表映射规则将数据从源映射到目标 Kafka 主题。要将数据映射到目标主题,您必须使用称为对象映射的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kafka 主题的数据记录。

除了具有分区键以外,Kafka 主题没有预设结构。

要创建对象映射规则,请将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

Amazon DMS 目前支持 map-record-to-recordmap-record-to-document 作为 rule-action 参数的唯一有效值。map-record-to-recordmap-record-to-document 值指定 Amazon DMS 默认情况下对未作为 exclude-columns 属性列表的一部分排除的记录执行的操作。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kafka 主题时使用 map-record-to-record。此规则类型使用关系数据库中的 taskResourceId.schemaName.tableName 值作为 Kafka 主题中的分区键,并为源数据库中的每个列创建一个属性。使用map-record-to-record,对于源表中未在exclude-columns属性列表,Amazon DMS会在目标主题中创建相应的属性。不论是否在属性映射中使用源列,都会创建对应的属性。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Test 的架构迁移到 Kafka 主题,请创建规则来将数据映射到目标主题。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

给定一个 Kafka 主题和一个分区键(在这种情况下,taskResourceId.schemaName.tableName),下面说明了使用 Kafka 目标主题中的示例数据生成的记录格式:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kafka 主题时,您可以调整数据的结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分区中。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

Apache Kafka 的消息格式

JSON 输出只是键值对的列表。

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

操作

对于数据记录,操作可以是 createreadupdatedelete

对于控制记录,操作可以是 TruncateTableDropTable

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

时间戳

JSON 消息构建时间的时间戳。此字段采用 ISO 8601 格式。