使用 Apache Kafka 作为目标 Amazon Database Migration Service - Amazon 数据库迁移服务
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Apache Kafka 作为目标 Amazon Database Migration Service

您可以使用将数据迁移 Amazon DMS 到 Apache Kafka 集群。Apache Kafka 是一个分布式流式处理平台。您可以使用 Apache Kafka 实时提取和处理流数据。

Amazon 还提供适用于 Apache Kafka 的 Apache Streaming(MSK亚马逊)的亚马逊托管流媒体作为目标 Amazon DMS 。亚马逊MSK是一项完全托管的 Apache Kafka 流媒体服务,可简化 Apache Kafka 实例的实施和管理。它适用于开源 Apache Kafka 版本,你可以像任何 Apache Kafka MSK 实例一样作为 Amazon DMS 目标访问亚马逊实例。有关更多信息,请参阅什么是亚马逊MSK? 在《适用于 Apache 的亚马逊托管流媒体 Kafka 开发者指南》中。

Kafka 集群将记录流存储在称为主题的类别中,这些类别被划分为多个分区。分区是主题中唯一标识的数据记录(消息)序列。分区可分布在集群中的多个代理之间,以支持并行处理某个主题的记录。有关 Apache Kafka 中主题和分区及其分布的更多信息,请参阅主题和日志分布

您的 Kafka 集群可以是 Amazon MSK 实例、在 Amazon EC2 实例上运行的集群或本地集群。Amazon MSK 实例或 Amazon EC2 实例上的集群可以位于相同VPC或不同的集群中。如果您的集群位于本地,可以为复制实例使用自己的本地名称服务器以解析集群的主机名。有关在复制实例上设置名称服务器的信息,请参阅 使用您自己的本地名称服务器。有关设置网络的更多信息,请参阅为复制实例设置网络

使用 Amazon MSK 集群时,请确保其安全组允许从您的复制实例进行访问。有关更改 Amazon MSK 集群安全组的信息,请参阅更改 Amazon MSK 集群的安全组

Amazon Database Migration Service 使用JSON向 Kafka 主题发布记录。在转换过程中,将源数据库中的每条记录 Amazon DMS 序列化为格式的属性值对。JSON

要将数据从支持的任何数据源迁移到目标 Kafka 集群,您应使用对象映射。使用对象映射,可确定如何在目标主题中建立数据记录结构。您还可以为每个表定义分区键,Apache Kafka 使用该键将数据分组为分区。

目前, Amazon DMS 支持每个任务使用一个主题。对于包含多个表的单个任务,所有消息都将转到同一个主题。每条消息都包含一个标识目标架构和表的元数据部分。 Amazon DMS 3.4.6 及更高版本支持使用对象映射进行多主题复制。有关更多信息,请参阅 使用对象映射进行多主题复制

Apache Kafka 端点设置

您可以通过 Amazon DMS 控制台中的端点设置或中的--kafka-settings选项来指定连接详细信息CLI。每个设置的要求如下:

  • Broker – 以各个 broker-hostname:port 的逗号分隔列表的形式指定 Kafka 集群中一个或多个代理的位置。例如,"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"。此设置可以指定集群中任何或所有代理的位置。集群代理全部通过通信来处理迁移到主题的数据记录的分区。

  • Topic –(可选)指定主题名称,最大长度为 255 个字母和符号。您可以使用句点 (.)、下划线 (_) 和减号 (-)。带句点 (.) 或下划线 (_) 的主题名称可能会在内部数据结构中发生冲突。在主题名称中使用其中一个符号,但不要同时使用这两个符号。如果您未指定主题名称,则 Amazon DMS 使用"kafka-default-topic"作为迁移主题。

    注意

    要 Amazon DMS 创建您指定的迁移主题或默认主题,请auto.create.topics.enable = true将其设置为 Kafka 集群配置的一部分。有关更多信息,请参阅 使用 Apache Kafka 作为目标时的限制 Amazon Database Migration Service

  • MessageFormat – 在端点上创建的记录的输出格式。消息格式为 JSON(默认值)或 JSON_UNFORMATTED(单行,无制表符)。

  • MessageMaxBytes – 端点上创建的记录的最大大小(以字节为单位)。默认值是 1000000。

    注意

    只能使用 Amazon CLI /更改SDKMessageMaxBytes为非默认值。例如,要修改现有的 Kafka 端点并更改 MessageMaxBytes,请使用以下命令。

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – 提供源数据库中的详细事务信息。此信息包括提交时间戳、日志位置以及 transaction_idprevious_transaction_idtransaction_record_id(事务内的记录偏移)的值。默认为 false

  • IncludePartitionValue – 显示 Kafka 消息输出中的分区值,除非分区类型为 schema-table-type。默认为 false

  • PartitionIncludeSchemaTable – 当分区类型为 primary-key-type 时,将架构和表名称作为分区值的前缀。这样做会提高数据在 Kafka 分区之间的分布广度。例如,假设 SysBench 架构具有数千个表,并且每个表的主键只有有限的范围。在这种情况下,同一主键将从数千个表发送到同一个分区,这会导致限制。默认为 false

  • IncludeTableAlterOperations— 包括更改控制数据中表的任何数据定义语言 (DDL) 操作,例如rename-tabledrop-tableadd-columndrop-column、和rename-column。默认为 false

  • IncludeControlDetails – 显示 Kafka 消息输出中的表定义、列定义以及表和列更改的详细控制信息。默认为 false

  • IncludeNullAndEmpty— 在目标中包含列NULL并清空列。默认为 false

  • SecurityProtocol— 使用传输层安全 (TLS) 设置与 Kafka 目标端点的安全连接。选项包括 ssl-authenticationssl-encryptionsasl-ssl。使用 sasl-ssl 需要 SaslUsernameSaslPassword

  • SslEndpointIdentificationAlgorithm – 为证书设置主机名验证。 Amazon DMS 版本 3.5.1 及更高版本支持此设置。这些选项包含以下内容:

    • NONE:在客户端连接中禁用代理的主机名验证。

    • HTTPS:在客户端连接中启用代理的主机名验证。

  • useLargeIntegerValue— 使用最多 18 位的 int,而不是将整数转换为双精度,从 3.5.4 Amazon DMS 版本中可用。默认值为 false。

您可以使用设置来帮助提高传输速度。为此, Amazon DMS 支持多线程完全加载到 Apache Kafka 目标集群。 Amazon DMS 通过包含以下选项的任务设置支持此多线程:

  • MaxFullLoadSubTasks— 使用此选项表示要并行加载的最大源表数。 Amazon DMS 使用专用子任务将每个表加载到其相应的 Kafka 目标表中。默认值为 8;最大值为 49。

  • ParallelLoadThreads— 使用此选项指定用于将每个表加载到其 Kafka 目标表中的线程数。 Amazon DMS Apache Kafka 目标的最大值为 32。您可以请求提高此最大值限制。

  • ParallelLoadBufferSize – 使用此选项指定在缓冲区(并行加载线程将数据加载到 Kafka 目标时使用)中存储的最大记录数。默认值是 50。最大值为 1000。将此设置与 ParallelLoadThreads 一起使用;仅在有多个线程时 ParallelLoadBufferSize 才有效。

  • ParallelLoadQueuesPerThread – 使用此选项可以指定每个并发线程访问的队列数,以便从队列中取出数据记录并为目标生成批处理负载。默认值是 1。最大值为 512。

您可以通过调整并行线程和批量操作的任务设置来提高 Kafka 端点的更改数据捕获 (CDC) 的性能。为此,您可以使用 ParallelApply* 任务设置来指定并发线程的数量、每个线程的队列数以及要存储在缓冲区中的记录数。例如,假设您要执行CDC加载并并行应用 128 个线程。您还希望对于每个线程访问 64 个队列,每个缓冲区存储 50 条记录。

为了提高CDC性能, Amazon DMS 支持以下任务设置:

  • ParallelApplyThreads— 指定CDC加载期间 Amazon DMS 用于将数据记录推送到 Kafka 目标端点的并发线程数。默认值为零(0),最大值为 32。

  • ParallelApplyBufferSize— 指定每个缓冲队列中存储的最大记录数,以便在CDC加载期间并发线程推送到 Kafka 目标端点。默认值是 100,最大值是 1,000。当 ParallelApplyThreads 指定多个线程时,请使用此选项。

  • ParallelApplyQueuesPerThread— 指定每个线程访问的队列数,以便在队列中提取数据记录并在此期间为 Kafka 端点生成批量加载。CDC默认 为 1。最大值为 512。

使用 ParallelApply* 任务设置时,partition-key-type 默认值是表的 primary-key,而不是 schema-name.table-name

使用传输层安全性连接到 Kafka () TLS

Kafka 集群使用传输层安全 (TLS) 接受安全连接。使用DMS,您可以使用以下三个安全协议选项中的任何一个来保护 Kafka 端点连接。

SSL加密 (server-encryption)

客户端通过服务器的证书验证服务器身份。然后在服务器和客户端之间建立加密连接。

SSL身份验证 (mutual-authentication)

服务器和客户端通过自己的证书相互验证身份。然后在服务器和客户端之间建立加密连接。

SASL-SSL (mutual-authentication)

简单身份验证和安全层 (SASL) 方法将客户端的证书替换为用户名和密码,以验证客户端身份。具体而言,您需要提供服务器已注册的用户名和密码,以便服务器可以验证客户端的身份。然后在服务器和客户端之间建立加密连接。

重要

Apache Kafka 和 Amazon MSK 接受已解析的证书。这是 Kafka 和 Amazon 的已知局限性尚MSK待解决。有关更多信息,请参阅 Apache Kafka 问题,KAFKA-3700。

如果您使用的是 AmazonMSK,请考虑使用访问控制列表 (ACLs) 来解决此已知限制。有关使用的更多信息ACLs,请参阅适用于 Apache Kafka 的《亚马逊托管流媒体 Kafka 开发者指南》的 Apache Kafka ACLs 部分。

如果您使用的是自管理 Kafka 集群,请参阅 2018 年 10 月 21 日的评论,了解有关配置集群的信息。

在 Amazon MSK 或自行管理的 Kafka 集群中使用SSL加密

您可以使用SSL加密来保护与 Amazon MSK 或自行管理的 Kafka 集群的终端节点连接。使用SSL加密身份验证方法时,客户端会通过服务器的证书来验证服务器的身份。然后在服务器和客户端之间建立加密连接。

使用SSL加密连接亚马逊 MSK
  • 创建目标 Kafka 端点时,使用 ssl-encryption 选项设置安全协议端点设置(SecurityProtocol)。

    以下JSON示例将安全协议设置为SSL加密。

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
对自管理的 Kafka 集群使用SSL加密
  1. 如果您在本地 Kafka 集群中使用私有证书颁发机构 (CA),请上传您的私有 CA 证书并获取亚马逊资源名称 () ARN。

  2. 创建目标 Kafka 端点时,使用 ssl-encryption 选项设置安全协议端点设置(SecurityProtocol)。以下JSON示例将安全协议设置为ssl-encryption

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 如果你使用的是私有 CA,ARN请在上面第一步SslCaCertificateArn中设置的。

使用 SSL 身份验证

您可以使用SSL身份验证来保护与 Amazon MSK 或自行管理的 Kafka 集群的终端节点连接。

要使用SSL身份验证启用客户端身份验证和加密以连接到 AmazonMSK,请执行以下操作:

  • 为 Kafka 准备私钥和公有证书。

  • 将证书上传到DMS证书管理器。

  • 使用 Kafka 终端节点ARNs设置中指定的相应证书创建 Kafka 目标端点。

为 Amazon 准备私钥和公有证书 MSK
  1. 创建EC2实例并设置客户端,使其使用身份验证,如亚马逊托管流媒体for Apache Kafka Kafka 开发者指南客户端身份验证部分的步骤 1 到 9 中所述。

    完成这些步骤后,您将获得证书-ARN(ARN保存在中的公共证书ACM)和一个包含在kafka.client.keystore.jks文件中的私钥。

  2. 使用以下命令获取公有证书并将证书复制到 signed-certificate-from-acm.pem 文件中:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    该命令会返回类似以下示例中显示的信息。

    {"Certificate": "123", "CertificateChain": "456"}

    然后,将 "123" 的等效内容复制到 signed-certificate-from-acm.pem 文件中。

  3. 通过导入 msk-rsa 密钥从 kafka.client.keystore.jks to keystore.p12 获取私钥,如以下示例所示。

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 使用以下命令将 keystore.p12 导出为 .pem 格式。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    将出现 “输入PEM密码短语” 消息,用于标识用于加密证书的密钥。

  5. .pem 文件中删除包属性和密钥属性,确保第一行以如下字符串开头。

    ---BEGIN ENCRYPTED PRIVATE KEY---
将公有证书和私钥上传到DMS证书管理器并测试与 Amazon 的连接 MSK
  1. 使用以下命令上传到DMS证书管理器。

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. 创建 Amazon MSK 目标终端节点并测试连接以确保TLS身份验证有效。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
重要

您可以使用SSL身份验证来保护与自管理 Kafka 集群的连接。在某些情况下,您可能会在本地 Kafka 集群中使用私有证书颁发机构(CA)。如果是,请将您的 CA 链、公有证书和私钥上传到DMS证书管理器。然后,在创建本地 Kafka 目标终端节点时,在终端节点设置中使用相应的 Amazon 资源名称 (ARN)。

为自管理 Kafka 集群准备私钥和签名的证书
  1. 生成以下示例所示的密钥对。

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 生成证书签名请求 (CSR)。

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 使用集群信任库中的 CA 签署。CSR如果您没有 CA,则可以创建自己的私有 CA。

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. ca-cert 导入服务器信任库和密钥库。如果您没有信任库,请使用以下命令创建信任库,并将 ca-cert 导入其中。

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 在证书上签名。

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 将签名的证书导入密钥库。

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 使用以下命令将 on-premise-rsa 密钥从 kafka.server.keystore.jks 导入 keystore.p12

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 使用以下命令将 keystore.p12 导出为 .pem 格式。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. encrypted-private-server-key.pemsigned-certificate.pem、和上传ca-cert到DMS证书管理器。

  10. 使用返回的创建终端节点ARNs。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

使用 SASL-SSL 身份验证连接到 Amazon MSK

简单身份验证和安全层 (SASL) 方法使用用户名和密码来验证客户端身份,并在服务器和客户端之间建立加密连接。

要使用SASL,您需要在设置 Amazon MSK 集群时先创建一个安全的用户名和密码。有关如何为亚马逊MSK集群设置安全用户名和密码的说明,请参阅《适用于 Apache Kafka 的 Apache Managed Streaming 开发者指南》中的亚马逊MSK集群设置SASL/SCRAM身份验证

然后,在创建 Kafka 目标端点时,使用 sasl-ssl 选项设置安全协议端点设置(SecurityProtocol)。您还可以设置 SaslUsernameSaslPassword 选项。请确保它们与您在首次设置 Amazon MSK 集群时创建的安全用户名和密码一致,如以下JSON示例所示。

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
注意
  • 目前,仅 Amazon DMS 支持公有 CA 支持 SASL-SSL。 DMS不支持 SASL-SSL 用于由私有 CA 支持的自我管理 Kafka。

  • 对于 SASL-SSL 身份验证,默认 Amazon DMS 支持 SCRAM-SHA -512 机制。 Amazon DMS 3.5.0 及更高版本还支持 Plain 机制。要支持 Plain 机制,请将KafkaSettingsAPI数据类型的SaslMechanism参数设置为PLAIN

使用之前的图像查看 Apache Kafka 作为目标的CDC行的原始值

在向 Kafka 等数据流目标写入CDC更新时,可以在更新更改之前查看源数据库行的原始值。为此,请根据源数据库引擎提供的数据 Amazon DMS 填充更新事件之前的图像

不同的源数据库引擎为之前映像提供不同的信息量:

  • 仅当列发生更改时,Oracle 才会对列提供更新。

  • Postgre 仅为属于主键的列SQL提供数据(无论是否已更改)。如果正在使用逻辑复制,并且REPLICAIDENTITYFULL已为源表设置逻辑复制,则可以在此处获取写入该WALs行的完整前后信息。

  • 我SQL通常会提供所有列的数据(无论是否更改)。

要启用之前映像以便将源数据库中的原始值添加到 Amazon DMS 输出,请使用 BeforeImageSettings 任务设置或 add-before-image-columns 参数。此参数应用列转换规则。

BeforeImageSettings使用从源数据库系统收集的值向每个更新操作添加一个新JSON属性,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings适用于满负荷加CDC任务(迁移现有数据和复制正在进行的更改),或者CDC仅适用于任务(仅复制数据更改)。不将 BeforeImageSettings 应用于仅完全加载的任务。

对于 BeforeImageSettings 选项,以下条件适用:

  • EnableBeforeImage 选项设置为 true 以启用之前映像。默认为 false

  • 使用FieldName选项为新JSON属性指定名称。当 EnableBeforeImagetrue 时,FieldName 是必填项且不能为空。

  • ColumnFilter 选项指定要使用之前映像添加的列。要仅添加属于表主键一部分的列,请使用默认值 pk-only。要仅添加非LOB类型的列,请使用non-lob。要添加具有之前映像值的任何列,请使用 all

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

使用之前映像转换规则

作为任务设置的替代方法,您可以使用 add-before-image-columns 参数,该参数应用列转换规则。使用此参数,您可以在映像之前在数据流目标(如 Kafka)CDC上启用。

通过在转换规则中使用 add-before-image-columns,可以对之前映像结果应用更精细的控制。转换规则允许您使用对象定位器,该定位器允许您控制为规则选择的表。此外,您可以将转换规则链接在一起,这样可以将不同的规则应用于不同的表。然后,您可以操控使用其他规则生成的列。

注意

不要在同一任务中将 add-before-image-columns 参数与 BeforeImageSettings 任务设置结合使用。而是对单个任务使用此参数或此设置,但不要同时使用这两者。

包含列的 add-before-image-columns 参数的 transformation 规则类型必须提供一个 before-image-def 部分。下面是一个示例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值附加到列名称前面,column-prefix 的默认值为 BI_column-suffix 的值将附加到列名称之后,默认值为空。不要同时将 column-prefixcolumn-suffix 设置为空字符串。

column-filter 选择一个值。要仅添加属于表主键一部分的列,请选择 pk-onlynon-lob选择仅添加非LOB类型的列。或者,选择 all 以添加任何具有之前映像值的列。

之前映像转换规则的示例

以下示例中的转换规则在目标中添加一个名为 BI_emp_no 的新列。因此,像 UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 这样的语句用 1 填充 BI_emp_no 字段。当您向 Amazon S3 目标写入CDC更新时,该BI_emp_no列可以判断哪个原始行已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

有关使用 add-before-image-columns 规则操作的信息,请参阅 转换规则和操作

使用 Apache Kafka 作为目标时的限制 Amazon Database Migration Service

将 Apache Kafka 作为目标时存在以下限制:

  • Amazon DMS Kafka 目标终端节点不支持适用于 Apache Kafka 的亚马逊托管流媒体 Kafka(亚马逊)的IAM访问控制。MSK

  • 不支持完整LOB模式。

  • 为您的集群指定 Kafka 配置文件,其属性 Amazon DMS 允许自动创建新主题。包括设置 auto.create.topics.enable = true。如果您使用的是 AmazonMSK,则可以在创建 Kafka 集群时指定默认配置,然后将auto.create.topics.enabletrue设置更改为。有关默认配置设置的更多信息,请参阅《适用于 A pache Kafka 的亚马逊托管流媒体 Kafka 开发者指南》中的默认亚马逊MSK配置。如果您需要修改使用 Amazon 创建的现有 Kafka 集群MSK,请运行 Amazon CLI 命令aws kafka create-configuration更新您的 Kafka 配置,如以下示例所示:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    此处,//~/kafka_configuration 是您使用所需的属性设置创建的配置文件。

    如果您使用的是自己安装在 Amazon 上的 Kafka 实例EC2,请使用您的实例提供的选项auto.create.topics.enable = true将 Kafka 集群配置修改为允许 Amazon DMS 自动创建新主题。

  • Amazon DMS 无论事务如何,都会将源数据库中单个记录的每次更新作为给定 Kafka 主题中的一条数据记录(消息)发布。

  • Amazon DMS 支持以下两种形式的分区键:

    • SchemaName.TableName:架构和表名称的组合。

    • ${AttributeName}:源数据库中某个字段的值或表的主键。JSON

  • 对于 Kafka 端点,不支持 BatchApply。为 Kafka BatchApplyEnabled 目标使用“批量应用”(例如,目标元数据任务设置)可能会导致数据丢失。

  • Amazon DMS 不支持迁移超过 16 位BigInt数据类型的值。要解决此限制问题,您可以使用以下转换规则将 BigInt 列转换为字符串。有关转换规则的更多信息,请参阅 转换规则和操作

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

使用对象映射将数据迁移到 Kafka 主题

Amazon DMS 使用表映射规则将数据从源映射到目标 Kafka 主题。要将数据映射到目标主题,您必须使用称为对象映射的表映射规则类型。您可以使用对象映射来定义源中的数据记录如何映射到发布到 Kafka 主题的数据记录。

除了具有分区键以外,Kafka 主题没有预设结构。

注意

您不一定要使用对象映射。可以使用常规表映射进行各种转换。但是,分区键类型将遵循以下默认行为:

  • 主键用作完全加载时的分区键。

  • 如果未使用并行应用任务设置,schema.table则用作的分区键。CDC

  • 如果使用并行应用任务设置,则主键将用作的分区键。CDC

要创建对象映射规则,请将 rule-type 指定为 object-mapping。此规则指定您要使用的对象映射的类型。

规则的结构如下所示。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

Amazon DMS 目前支持将map-record-to-recordmap-record-to-document作为该rule-action参数的唯一有效值。这些设置会影响未作为 exclude-columns 属性列表一部分排除的值。map-record-to-recordmap-record-to-document值指定默认情况下如何 Amazon DMS 处理这些记录。这些值不会以任何方式影响属性映射。

从关系数据库迁移到 Kafka 主题时使用 map-record-to-record。此规则类型使用关系数据库中的 taskResourceId.schemaName.tableName 值作为 Kafka 主题中的分区键,并为源数据库中的每个列创建一个属性。

使用 map-record-to-record 时请注意:

  • 此设置仅影响 exclude-columns 列表排除的列。

  • 对于每个这样的列,在目标主题中 Amazon DMS 创建一个相应的属性。

  • Amazon DMS 无论源列是否用于属性映射,都会创建相应的属性。

了解 map-record-to-record 的一种方法是在操作时加以观察。对于本示例,假定您使用关系数据库表行开始处理,该行具有以下结构和数据。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

要将此信息从名为 Test 的架构迁移到 Kafka 主题,请创建规则来将数据映射到目标主题。以下规则对此映射进行了说明。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

给定 Kafka 主题和分区键(在本例中为 taskResourceId.schemaName.tableName),下面说明了使用 Kafka 目标主题中的示例数据生成的记录格式:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

使用属性映射调整数据结构

在使用属性映射将数据迁移到 Kafka 主题时,您可以调整数据的结构。例如,您可能希望将源中的多个字段合并到目标中的单个字段中。以下属性映射说明如何调整数据结构。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

要为 partition-key 设置常量值,请指定 partition-key 值。例如,您可以执行此操作来强制将所有数据存储在一个分区中。以下映射说明了此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

表示特定表的控制记录的 partition-key 值为 TaskId.SchemaName.TableName。表示特定任务的控制记录的 partition-key 值为该记录的 TaskId。在对象映射中指定 partition-key 值不会影响控制记录的 partition-key

使用对象映射进行多主题复制

默认情况下, Amazon DMS 任务会将所有源数据迁移到以下 Kafka 主题之一:

  • 如 Amazon DMS 目标终端节点的 “主题” 字段所指定。

  • 如果目标端点的主题字段未填,且 Kafka auto.create.topics.enable 设置设为 true,则由 kafka-default-topic 指定。

在 3.4.6 及更高版本的 Amazon DMS 引擎中,您可以使用kafka-target-topic属性将每个迁移的源表映射到单独的主题。例如,下面的对象映射规则分别将源表 CustomerAddress 迁移到 Kafka 主题 customer_topicaddress_topic。同时,将所有其他源表(包括Test架构中的Bills表) Amazon DMS 迁移到目标终端节点中指定的主题。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

通过使用 Kafka 多主题复制,您可以使用单个复制任务将源表分组并迁移到单独的 Kafka 主题。

Apache Kafka 的消息格式

JSON输出只是一个键值对列表。

RecordType

记录类型可以是数据或控制。数据记录表示源中的实际行。控制记录表示流中的重要事件,例如,重新开始任务。

操作

对于数据记录,操作可以是 loadinsertupdatedelete

对于控制记录,操作可以是 create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-columncolumn-type-change

SchemaName

记录的源架构。此字段对于控制记录可能是空的。

TableName

记录的源表。此字段对于控制记录可能是空的。

Timestamp

构造JSON消息的时间戳。该字段的格式为 ISO 8601。

以下JSON消息示例说明了包含所有其他元数据的数据类型消息。

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

以下JSON消息示例说明了控件类型消息。

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }