Step 4: Create a topic in the Amazon MSK cluster
In this step of Getting Started Using Amazon MSK, you install Apache Kafka client libraries and tools on the client machine, and then you create a topic.
Warning
Apache Kafka version numbers used in this tutorial are examples only. We recommend that you use the same version of the client as your MSK cluster version. An older client version might be missing certain features and critical bug fixes.
Determining your MSK cluster version
Open the Amazon MSK console at https://console.aws.amazon.com/msk/
. -
In the navigation bar, choose the Region where you created the MSK cluster.
Choose the MSK cluster.
Note the version of Apache Kafka used on the cluster.
Replace instances of Amazon MSK version numbers in this tutorial with the version obtained in Step 3.
Creating a topic on the client machine
-
Connect to your client machine.
Open the Amazon EC2 console at https://console.amazonaws.cn/ec2/
. -
In the navigation pane, choose Instances. Then, select the check box beside the name of the client machine that you created in Step 3: Create a client machine.
-
Choose Actions, and then choose Connect. Follow the instructions in the console to connect to your client machine.
-
Install Java and set up the Kafka version environment variable.
-
Install Java on the client machine by running the following command.
sudo yum -y install java-11
-
Store the Kafka version of your MSK cluster in the environment variable,
KAFKA_VERSION
, as shown in the following command. You'll need this information throughout the setup.export KAFKA_VERSION=
{KAFKA VERSION}
-
-
Download and extract Apache Kafka.
-
Run the following command to download Apache Kafka.
wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
For example, if your MSK cluster uses Apache Kafka version 3.6.0, run the following command.
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
Note
The following list presents some alternative Kafka download information that you can use, if you encounter any issues.
-
If you encounter connectivity issues or want to use a mirror site, try using the Apache mirror selector, as shown in the following command.
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
-
Download an appropriate version directly from the Apache Kafka website
.
-
-
Run the following command in the directory where you downloaded the TAR file in the previous step.
tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
-
Store the full path to the newly created directory inside the
KAFKA_ROOT
environment variable.export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
-
-
Set up authentication for your MSK cluster.
-
Find the latest version
of the Amazon MSK IAM client library. This library allows your client machine to access the MSK cluster using IAM authentication. -
Using the following commands, navigate to the
$KAFKA_ROOT/libs
directory and download the associated Amazon MSK IAM JAR that you found in the previous step. Make sure to replace{LATEST VERSION}
with the actual version number you're downloading.cd $KAFKA_ROOT/libs
wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-
{LATEST VERSION}
-all.jarNote
Before running any Kafka commands that interact with your MSK cluster, you might need to add the Amazon MSK IAM JAR file to your Java classpath. Set the
CLASSPATH
environment variable, as shown in the following example.export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-
{LATEST VERSION}
-all.jarThis sets the
CLASSPATH
for your entire session, making the JAR available to all subsequent Kafka commands. -
Go to the
$KAFKA_ROOT/config
directory to create the client configuration file.cd $KAFKA_ROOT/config
-
Copy the following property settings and paste them into a new file. Save the file as
client.properties
.security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
-
-
(Optional) Adjust the Java heap size for Kafka tools.
If you encounter any memory-related issues or you're working with a large number of topics or partitions, you can adjust the Java heap size. To do this, set the
KAFKA_HEAP_OPTS
environment variable before running Kafka commands.The following example sets both the maximum and initial heap size to 512 megabytes. Adjust these values according to your specific requirements and available system resources.
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
-
Get your cluster connection information.
Open the Amazon MSK console at https://console.aws.amazon.com/msk/
. -
Wait for the status of your cluster to become Active. This might take several minutes. After the status becomes Active, choose the cluster name. This takes you to a page containing the cluster summary.
-
Choose View client information.
-
Copy the connection string for the private endpoint.
You'll get three endpoints for each of the brokers. Store one of these connection strings in the environment variable
BOOTSTRAP_SERVER
, as shown in the following command. Replace<bootstrap-server-string>
with the actual value of the connection string.export BOOTSTRAP_SERVER=
<bootstrap-server-string>
-
Run the following command to create the topic.
$KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
If you get a
NoSuchFileException
for theclient.properties
file, make sure that this file exists in the current working directory within the Kafka bin directory.Note
If you prefer not to set the
CLASSPATH
environment variable for your entire session, you can alternatively prefix each Kafka command with theCLASSPATH
variable. This approach applies the classpath only to that specific command.CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-
{LATEST VERSION}
-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic -
(Optional) Verify that the topic was created successfully.
-
If the command succeeds, you should see the following message:
Created topic MSKTutorialTopic.
-
List all topics to confirm your topic exists.
$KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties
If the command is unsuccessful or you run into an error, see Troubleshoot your Amazon MSK cluster for troubleshooting information.
-
-
(Optional) Delete the environment variables you used in this tutorial.
If you want to keep your environment variables for the next steps in this tutorial, skip this step. Otherwise, you can unset these variables, as shown in the following example.
unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS
Next Step
Step 5: Produce and consume data