Using Apache Kafka as a target for Amazon Database Migration Service - Amazon Database Migration Service
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Using Apache Kafka as a target for Amazon Database Migration Service

You can use Amazon DMS to migrate data to an Apache Kafka cluster. Apache Kafka is a distributed streaming platform. You can use Apache Kafka for ingesting and processing streaming data in real-time.

Amazon also offers Amazon Managed Streaming for Apache Kafka (Amazon MSK) to use as an Amazon DMS target. Amazon MSK is a fully managed Apache Kafka streaming service that simplifies the implementation and management of Apache Kafka instances. It works with open-source Apache Kafka versions, and you access Amazon MSK instances as Amazon DMS targets exactly like any Apache Kafka instance. For more information, see What is Amazon MSK? in the Amazon Managed Streaming for Apache Kafka Developer Guide.

A Kafka cluster stores streams of records in categories called topics that are divided into partitions. Partitions are uniquely identified sequences of data records (messages) in a topic. Partitions can be distributed across multiple brokers in a cluster to enable parallel processing of a topic’s records. For more information on topics and partitions and their distribution in Apache Kafka, see Topics and logs and Distribution.

Your Kafka cluster can be either an Amazon MSK instance, a cluster running on an Amazon EC2 instance, or an on-premises cluster. An Amazon MSK instance or a cluster on an Amazon EC2 instance can be in the same VPC or a different one. If your cluster is on-premises, you can use your own on-premises name server for your replication instance to resolve the cluster's host name. For information about setting up a name server for your replication instance, see Using your own on-premises name server. For more information about setting up a network, see Setting up a network for a replication instance.

When using an Amazon MSK cluster, make sure that its security group allows access from your replication instance. For information about changing the security group for an Amazon MSK cluster, see Changing an Amazon MSK cluster's security group.

Amazon Database Migration Service publishes records to a Kafka topic using JSON. During conversion, Amazon DMS serializes each record from the source database into an attribute-value pair in JSON format.

To migrate your data from any supported data source to a target Kafka cluster, you use object mapping. With object mapping, you determine how to structure the data records in the target topic. You also define a partition key for each table, which Apache Kafka uses to group the data into its partitions.

Currently, Amazon DMS supports a single topic per task. For a single task with multiple tables, all messages go to a single topic. Each message includes a metadata section that identifies the target schema and table. Amazon DMS versions 3.4.6 and higher support multitopic replication using object mapping. For more information, see Multitopic replication using object mapping.

Apache Kafka endpoint settings

You can specify connection details through endpoint settings in the Amazon DMS console, or the --kafka-settings option in the CLI. The requirements for each setting follow:

  • Broker – Specify the locations of one or more brokers in your Kafka cluster in the form of a comma-separated list of each broker-hostname:port. An example is "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876". This setting can specify the locations of any or all brokers in the cluster. The cluster brokers all communicate to handle the partitioning of data records migrated to the topic.

  • Topic – (Optional) Specify the topic name with a maximum length of 255 letters and symbols. You can use period (.), underscore (_), and minus (-). Topic names with a period (.) or underscore (_) can collide in internal data structures. Use either one, but not both of these symbols in the topic name. If you don't specify a topic name, Amazon DMS uses "kafka-default-topic" as the migration topic.

    Note

    To have Amazon DMS create either a migration topic you specify or the default topic, set auto.create.topics.enable = true as part of your Kafka cluster configuration. For more information, see Limitations when using Apache Kafka as a target for Amazon Database Migration Service

  • MessageFormat – The output format for the records created on the endpoint. The message format is JSON (default) or JSON_UNFORMATTED (a single line with no tab).

  • MessageMaxBytes – The maximum size in bytes for records created on the endpoint. The default is 1,000,000.

    Note

    You can only use the Amazon CLI/SDK to change MessageMaxBytes to a non-default value. For example, to modify your existing Kafka endpoint and change MessageMaxBytes, use the following command.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – Provides detailed transaction information from the source database. This information includes a commit timestamp, a log position, and values for transaction_id, previous_transaction_id, and transaction_record_id (the record offset within a transaction). The default is false.

  • IncludePartitionValue – Shows the partition value within the Kafka message output, unless the partition type is schema-table-type. The default is false.

  • PartitionIncludeSchemaTable – Prefixes schema and table names to partition values, when the partition type is primary-key-type. Doing this increases data distribution among Kafka partitions. For example, suppose that a SysBench schema has thousands of tables and each table has only limited range for a primary key. In this case, the same primary key is sent from thousands of tables to the same partition, which causes throttling. The default is false.

  • IncludeTableAlterOperations – Includes any data definition language (DDL) operations that change the table in the control data, such as rename-table, drop-table, add-column, drop-column, and rename-column. The default is false.

  • IncludeControlDetails – Shows detailed control information for table definition, column definition, and table and column changes in the Kafka message output. The default is false.

  • IncludeNullAndEmpty – Include NULL and empty columns in the target. The default is false.

  • SecurityProtocol – Sets a secure connection to a Kafka target endpoint using Transport Layer Security (TLS). Options include ssl-authentication, ssl-encryption, and sasl-ssl. Using sasl-ssl requires SaslUsername and SaslPassword.

  • SslEndpointIdentificationAlgorithm – Sets hostname verification for the certificate. This setting is supported in Amazon DMS version 3.5.1 and later. Options include the following:

    • NONE: Disable hostname verification of the broker in the client connection.

    • HTTPS: Enable hostname verification of the broker in the client connection.

You can use settings to help increase the speed of your transfer. To do so, Amazon DMS supports a multithreaded full load to an Apache Kafka target cluster. Amazon DMS supports this multithreading with task settings that include the following:

  • MaxFullLoadSubTasks – Use this option to indicate the maximum number of source tables to load in parallel. Amazon DMS loads each table into its corresponding Kafka target table using a dedicated subtask. The default is 8; the maximum value is 49.

  • ParallelLoadThreads – Use this option to specify the number of threads that Amazon DMS uses to load each table into its Kafka target table. The maximum value for an Apache Kafka target is 32. You can ask to have this maximum limit increased.

  • ParallelLoadBufferSize – Use this option to specify the maximum number of records to store in the buffer that the parallel load threads use to load data to the Kafka target. The default value is 50. The maximum value is 1,000. Use this setting with ParallelLoadThreads. ParallelLoadBufferSize is valid only when there is more than one thread.

  • ParallelLoadQueuesPerThread – Use this option to specify the number of queues each concurrent thread accesses to take data records out of queues and generate a batch load for the target. The default is 1. The maximum is 512.

You can improve the performance of change data capture (CDC) for Kafka endpoints by tuning task settings for parallel threads and bulk operations. To do this, you can specify the number of concurrent threads, queues per thread, and the number of records to store in a buffer using ParallelApply* task settings. For example, suppose you want to perform a CDC load and apply 128 threads in parallel. You also want to access 64 queues per thread, with 50 records stored per buffer.

To promote CDC performance, Amazon DMS supports these task settings:

  • ParallelApplyThreads – Specifies the number of concurrent threads that Amazon DMS uses during a CDC load to push data records to a Kafka target endpoint. The default value is zero (0) and the maximum value is 32.

  • ParallelApplyBufferSize – Specifies the maximum number of records to store in each buffer queue for concurrent threads to push to a Kafka target endpoint during a CDC load. The default value is 100 and the maximum value is 1,000. Use this option when ParallelApplyThreads specifies more than one thread.

  • ParallelApplyQueuesPerThread – Specifies the number of queues that each thread accesses to take data records out of queues and generate a batch load for a Kafka endpoint during CDC. The default is 1. The maximum is 512.

When using ParallelApply* task settings, the partition-key-type default is the primary-key of the table, not schema-name.table-name.

Connecting to Kafka using Transport Layer Security (TLS)

A Kafka cluster accepts secure connections using Transport Layer Security (TLS). With DMS, you can use any one of the following three security protocol options to secure a Kafka endpoint connection.

SSL encryption (server-encryption)

Clients validate server identity through the server’s certificate. Then an encrypted connection is made between server and client.

SSL authentication (mutual-authentication)

Server and client validate the identity with each other through their own certificates. Then an encrypted connection is made between server and client.

SASL-SSL (mutual-authentication)

The Simple Authentication and Security Layer (SASL) method replaces the client’s certificate with a user name and password to validate a client identity. Specifically, you provide a user name and password that the server has registered so that the server can validate the identity of a client. Then an encrypted connection is made between server and client.

Important

Apache Kafka and Amazon MSK accept resolved certificates. This is a known limitation of Kafka and Amazon MSK to be addressed. For more information, see Apache Kafka issues, KAFKA-3700.

If you're using Amazon MSK, consider using access control lists (ACLs) as a workaround to this known limitation. For more information about using ACLs, see Apache Kafka ACLs section of Amazon Managed Streaming for Apache Kafka Developer Guide.

If you're using a self-managed Kafka cluster, see Comment dated 21/Oct/18 for information about configuring your cluster.

Using SSL encryption with Amazon MSK or a self-managed Kafka cluster

You can use SSL encryption to secure an endpoint connection to Amazon MSK or a self-managed Kafka cluster. When you use the SSL encryption authentication method, clients validate a server's identity through the server’s certificate. Then an encrypted connection is made between server and client.

To use SSL encryption to connect to Amazon MSK
  • Set the security protocol endpoint setting (SecurityProtocol) using the ssl-encryption option when you create your target Kafka endpoint.

    The JSON example following sets the security protocol as SSL encryption.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
To use SSL encryption for a self-managed Kafka cluster
  1. If you're using a private Certification Authority (CA) in your on-premises Kafka cluster, upload your private CA cert and get an Amazon Resource Name (ARN).

  2. Set the security protocol endpoint setting (SecurityProtocol) using the ssl-encryption option when you create your target Kafka endpoint. The JSON example following sets the security protocol as ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. If you're using a private CA, set SslCaCertificateArn in the ARN you got in the first step above.

Using SSL authentication

You can use SSL authentication to secure an endpoint connection to Amazon MSK or a self-managed Kafka cluster.

To enable client authentication and encryption using SSL authentication to connect to Amazon MSK, do the following:

  • Prepare a private key and public certificate for Kafka.

  • Upload certificates to the DMS certificate manager.

  • Create a Kafka target endpoint with corresponding certificate ARNs specified in Kafka endpoint settings.

To prepare a private key and public certificate for Amazon MSK
  1. Create an EC2 instance and set up a client to use authentication as described in steps 1 through 9 in the Client Authentication section of Amazon Managed Streaming for Apache Kafka Developer Guide.

    After you complete those steps, you have a Certificate-ARN (the public certificate ARN saved in ACM), and a private key contained within a kafka.client.keystore.jks file.

  2. Get the public certificate and copy the certificate to the signed-certificate-from-acm.pem file, using the command following:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    That command returns information similar to the following example:

    {"Certificate": "123", "CertificateChain": "456"}

    You then copy your equivalent of "123" to the signed-certificate-from-acm.pem file.

  3. Get the private key by importing the msk-rsa key from kafka.client.keystore.jks to keystore.p12, as shown in the following example.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. Use the following command to export keystore.p12 into .pem format.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    The Enter PEM pass phrase message appears and identifies the key that is applied to encrypt the certificate.

  5. Remove bag attributes and key attributes from the .pem file to make sure that the first line starts with the following string.

    ---BEGIN ENCRYPTED PRIVATE KEY---
To upload a public certificate and private key to the DMS certificate manager and test the connection to Amazon MSK
  1. Upload to DMS certificate manager using the following command.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Create an Amazon MSK target endpoint and test connection to make sure that TLS authentication works.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
Important

You can use SSL authentication to secure a connection to a self-managed Kafka cluster. In some cases, you might use a private Certification Authority (CA) in your on-premises Kafka cluster. If so, upload your CA chain, public certificate, and private key to the DMS certificate manager. Then, use the corresponding Amazon Resource Name (ARN) in your endpoint settings when you create your on-premises Kafka target endpoint.

To prepare a private key and signed certificate for a self-managed Kafka cluster
  1. Generate a key pair as shown in the following example.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. Generate a Certificate Sign Request (CSR).

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. Use the CA in your cluster truststore to sign the CSR. If you don't have a CA, you can create your own private CA.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. Import ca-cert into the server truststore and keystore. If you don't have a truststore, use the following command to create the truststore and import ca-cert into it.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. Sign the certificate.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. Import the signed certificate to the keystore.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. Use the following command to import the on-premise-rsa key from kafka.server.keystore.jks to keystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. Use the following command to export keystore.p12 into .pem format.

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. Upload encrypted-private-server-key.pem, signed-certificate.pem, and ca-cert to the DMS certificate manager.

  10. Create an endpoint by using the returned ARNs.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

Using SASL-SSL authentication to connect to Amazon MSK

The Simple Authentication and Security Layer (SASL) method uses a user name and password to validate a client identity, and makes an encrypted connection between server and client.

To use SASL, you first create a secure user name and password when you set up your Amazon MSK cluster. For a description how to set up a secure user name and password for an Amazon MSK cluster, see Setting up SASL/SCRAM authentication for an Amazon MSK cluster in the Amazon Managed Streaming for Apache Kafka Developer Guide.

Then, when you create your Kafka target endpoint, set the security protocol endpoint setting (SecurityProtocol) using the sasl-ssl option. You also set SaslUsername and SaslPassword options. Make sure these are consistent with the secure user name and password that you created when you first set up your Amazon MSK cluster, as shown in the following JSON example.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
Note
  • Currently, Amazon DMS supports only public CA backed SASL-SSL. DMS doesn't support SASL-SSL for use with self-managed Kafka that is backed by private CA.

  • For SASL-SSL authentication, Amazon DMS supports the SCRAM-SHA-512 mechanism by default. Amazon DMS versions 3.5.0 and higher also support the Plain mechanism. To support the Plain mechanism, set the SaslMechanism parameter of the KafkaSettings API data type to PLAIN.

Using a before image to view original values of CDC rows for Apache Kafka as a target

When writing CDC updates to a data-streaming target like Kafka you can view a source database row's original values before change by an update. To make this possible, Amazon DMS populates a before image of update events based on data supplied by the source database engine.

Different source database engines provide different amounts of information for a before image:

  • Oracle provides updates to columns only if they change.

  • PostgreSQL provides only data for columns that are part of the primary key (changed or not). If logical replication is in use and REPLICA IDENTITY FULL is set for the source table, you can get entire before and after information on the row written to the WALs and available here.

  • MySQL generally provides data for all columns (changed or not).

To enable before imaging to add original values from the source database to the Amazon DMS output, use either the BeforeImageSettings task setting or the add-before-image-columns parameter. This parameter applies a column transformation rule.

BeforeImageSettings adds a new JSON attribute to every update operation with values collected from the source database system, as shown following.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
Note

Apply BeforeImageSettings to full load plus CDC tasks (which migrate existing data and replicate ongoing changes), or to CDC only tasks (which replicate data changes only). Don't apply BeforeImageSettings to tasks that are full load only.

For BeforeImageSettings options, the following applies:

  • Set the EnableBeforeImage option to true to enable before imaging. The default is false.

  • Use the FieldName option to assign a name to the new JSON attribute. When EnableBeforeImage is true, FieldName is required and can't be empty.

  • The ColumnFilter option specifies a column to add by using before imaging. To add only columns that are part of the table's primary keys, use the default value, pk-only. To add only columns that are not of LOB type, use non-lob. To add any column that has a before image value, use all.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

Using a before image transformation rule

As as an alternative to task settings, you can use the add-before-image-columns parameter, which applies a column transformation rule. With this parameter, you can enable before imaging during CDC on data streaming targets like Kafka.

By using add-before-image-columns in a transformation rule, you can apply more fine-grained control of the before image results. Transformation rules enable you to use an object locator that gives you control over tables selected for the rule. Also, you can chain transformation rules together, which allows different rules to be applied to different tables. You can then manipulate the columns produced by using other rules.

Note

Don't use the add-before-image-columns parameter together with the BeforeImageSettings task setting within the same task. Instead, use either the parameter or the setting, but not both, for a single task.

A transformation rule type with the add-before-image-columns parameter for a column must provide a before-image-def section. The following shows an example.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

The value of column-prefix is prepended to a column name, and the default value of column-prefix is BI_. The value of column-suffix is appended to the column name, and the default is empty. Don't set both column-prefix and column-suffix to empty strings.

Choose one value for column-filter. To add only columns that are part of table primary keys, choose pk-only . Choose non-lob to only add columns that are not of LOB type. Or choose all to add any column that has a before-image value.

Example for a before image transformation rule

The transformation rule in the following example adds a new column called BI_emp_no in the target. So a statement like UPDATE employees SET emp_no = 3 WHERE emp_no = 1; populates the BI_emp_no field with 1. When you write CDC updates to Amazon S3 targets, the BI_emp_no column makes it possible to tell which original row was updated.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

For information on using the add-before-image-columns rule action, see Transformation rules and actions.

Limitations when using Apache Kafka as a target for Amazon Database Migration Service

The following limitations apply when using Apache Kafka as a target:

  • Amazon DMS Kafka target endpoints don't support IAM access control for Amazon Managed Streaming for Apache Kafka (Amazon MSK).

  • Full LOB mode is not supported.

  • Specify a Kafka configuration file for your cluster with properties that allow Amazon DMS to automatically create new topics. Include the setting, auto.create.topics.enable = true. If you are using Amazon MSK, you can specify the default configuration when you create your Kafka cluster, then change the auto.create.topics.enable setting to true. For more information about the default configuration settings, see The default Amazon MSK configuration in the Amazon Managed Streaming for Apache Kafka Developer Guide. If you need to modify an existing Kafka cluster created using Amazon MSK, run the Amazon CLI command aws kafka create-configuration to update your Kafka configuration, as in the following example:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    Here, //~/kafka_configuration is the configuration file you have created with the required property settings.

    If you are using your own Kafka instance installed on Amazon EC2, modify the Kafka cluster configuration with the auto.create.topics.enable = true setting to allow Amazon DMS to automatically create new topics, using the options provided with your instance.

  • Amazon DMS publishes each update to a single record in the source database as one data record (message) in a given Kafka topic regardless of transactions.

  • Amazon DMS supports the following two forms for partition keys:

    • SchemaName.TableName: A combination of the schema and table name.

    • ${AttributeName}: The value of one of the fields in the JSON, or the primary key of the table in the source database.

  • BatchApply is not supported for a Kafka endpoint. Using Batch Apply (for example, the BatchApplyEnabled target metadata task setting) for a Kafka target might result in loss of data.

  • Amazon DMS doesn't support migrating values of BigInt data type with more than 16 digits. To work around this limitation, you can use the following transformation rule to convert the BigInt column to a string. For more information about transformation rules, see Transformation rules and actions.

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

Using object mapping to migrate data to a Kafka topic

Amazon DMS uses table-mapping rules to map data from the source to the target Kafka topic. To map data to a target topic, you use a type of table-mapping rule called object mapping. You use object mapping to define how data records in the source map to the data records published to a Kafka topic.

Kafka topics don't have a preset structure other than having a partition key.

Note

You don't have to use object mapping. You can use regular table mapping for various transformations. However, the partition key type will follow these default behaviors:

  • Primary Key is used as a partition key for Full Load.

  • If no paralle-apply task settings are used, schema.table is used as a partition key for CDC.

  • If parallel-apply task settings are used, Primary key is used as a partition key for CDC.

To create an object-mapping rule, specify rule-type as object-mapping. This rule specifies what type of object mapping you want to use.

The structure for the rule is as follows.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

Amazon DMS currently supports map-record-to-record and map-record-to-document as the only valid values for the rule-action parameter. These settings affect values that aren't excluded as part of the exclude-columns attribute list. The map-record-to-record and map-record-to-document values specify how Amazon DMS handles these records by default. These values don't affect the attribute mappings in any way.

Use map-record-to-record when migrating from a relational database to a Kafka topic. This rule type uses the taskResourceId.schemaName.tableName value from the relational database as the partition key in the Kafka topic and creates an attribute for each column in the source database.

When using map-record-to-record, note the following:

  • This setting only affects columns excluded by the exclude-columns list.

  • For every such column, Amazon DMS creates a corresponding attribute in the target topic.

  • Amazon DMS creates this corresponding attribute regardless of whether the source column is used in an attribute mapping.

One way to understand map-record-to-record is to see it in action. For this example, assume that you are starting with a relational database table row with the following structure and data.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

To migrate this information from a schema named Test to a Kafka topic, you create rules to map the data to the target topic. The following rule illustrates the mapping.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Given a Kafka topic and a partition key (in this case, taskResourceId.schemaName.tableName), the following illustrates the resulting record format using our sample data in the Kafka target topic:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

Restructuring data with attribute mapping

You can restructure the data while you are migrating it to a Kafka topic using an attribute map. For example, you might want to combine several fields in the source into a single field in the target. The following attribute map illustrates how to restructure the data.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

To set a constant value for partition-key, specify a partition-key value. For example, you might do this to force all the data to be stored in a single partition. The following mapping illustrates this approach.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
Note

The partition-key value for a control record that is for a specific table is TaskId.SchemaName.TableName. The partition-key value for a control record that is for a specific task is that record's TaskId. Specifying a partition-key value in the object mapping has no impact on the partition-key for a control record.

Multitopic replication using object mapping

By default, Amazon DMS tasks migrate all source data to one of the Kafka topics following:

  • As specified in the Topic field of the Amazon DMS target endpoint.

  • As specified by kafka-default-topic if the Topic field of the target endpoint isn't populated and the Kafka auto.create.topics.enable setting is set to true.

With Amazon DMS engine versions 3.4.6 and higher, you can use the kafka-target-topic attribute to map each migrated source table to a separate topic. For example, the object mapping rules following migrate the source tables Customer and Address to the Kafka topics customer_topic and address_topic, respectively. At the same time, Amazon DMS migrates all other source tables, including the Bills table in the Test schema, to the topic specified in the target endpoint.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

By using Kafka multitopic replication, you can group and migrate source tables to separate Kafka topics using a single replication task.

Message format for Apache Kafka

The JSON output is simply a list of key-value pairs.

RecordType

The record type can be either data or control. Data records represent the actual rows in the source. Control records are for important events in the stream, for example a restart of the task.

Operation

For data records, the operation can be load, insert, update, or delete.

For control records, the operation can be create-table, rename-table, drop-table, change-columns, add-column, drop-column, rename-column, or column-type-change.

SchemaName

The source schema for the record. This field can be empty for a control record.

TableName

The source table for the record. This field can be empty for a control record.

Timestamp

The timestamp for when the JSON message was constructed. The field is formatted with the ISO 8601 format.

The following JSON message example illustrates a data type message with all additional metadata.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

The following JSON message example illustrates a control type message.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }