Operations
The REST API includes the following operations
-
Associates a list of SCRAM secrets with a cluster. SCRAM secrets are stored in the Amazon Secrets Manager service, and are used to authenticate clients using sign-in credentials. You can associate up to 10 secrets with a cluster at a time.
-
Disassociates a list of SCRAM secrets from a cluster. SCRAM secrets are stored in the Amazon Secrets Manager service, and are used to authenticate clients using sign-in credentials. You can disassociate up to 10 secrets from a cluster at a time.
-
Creates a new MSK cluster. The following Python 3.6 examples shows how you can create a cluster that's distributed over two Availability Zones. Before you run this Python script, replace the example subnet and security-group IDs with the IDs of your subnets and security group. When you create an MSK cluster, its brokers get evenly distributed over a number of Availability Zones that's equal to the number of subnets that you specify in the
BrokerNodeGroupInfo
parameter. In this example, you can add a third subnet to get a cluster that's distributed over three Availability Zones.import boto3 client = boto3.client('kafka') response = client.create_cluster( BrokerNodeGroupInfo={ 'BrokerAZDistribution': 'DEFAULT', 'ClientSubnets': [ 'subnet-012345678901fedcba', 'subnet-9876543210abcdef01' ], 'InstanceType': 'kafka.m5.large', 'SecurityGroups': [ 'sg-012345abcdef789789' ] }, ClusterName='SalesCluster', EncryptionInfo={ 'EncryptionInTransit': { 'ClientBroker': 'TLS_PLAINTEXT', 'InCluster': True } }, EnhancedMonitoring='PER_TOPIC_PER_BROKER', KafkaVersion='2.2.1', NumberOfBrokerNodes=2 ) print(response)
-
Creates a new MSK configuration. To see an example of how to use this operation, first save the following text to a file and name the file
config-file.txt
.auto.create.topics.enable = true zookeeper.connection.timeout.ms = 1000 log.roll.ms = 604800000
Now run the following Python 3.6 script in the folder where you saved
config-file.txt
. This script uses the properties specified inconfig-file.txt
to create a configuration namedSalesClusterConfiguration
. This configuration can work with Apache Kafka versions 1.1.1 and 2.1.0.import boto3 client = boto3.client('kafka') config_file = open('config-file.txt', 'r') server_properties = config_file.read() response = client.create_configuration( Name='SalesClusterConfiguration', Description='The configuration to use on all sales clusters.', KafkaVersions=['1.1.1', '2.1.0'], ServerProperties=server_properties ) print(response)
-
Create remote VPC connection.
-
Deletes the MSK cluster specified by the Amazon Resource Name (ARN) in the request, and all its revisions.
-
Delete cluster policy.
-
Deletes a cluster configuration and all its revisions.
-
Delete remote VPC connection.
-
Returns a description of the MSK cluster whose Amazon Resource Name (ARN) is specified in the request. The following is a Python 3.6 example of how to use this operation. Before you run this Python script, replace the example cluster Amazon Resource Name (ARN) with the ARN of the cluster you want to describe. If you don't know the ARN of the cluster, you can use the
ListClusters
operation to list all the clusters and see their ARNs and full descriptions.import boto3 client = boto3.client('kafka') response = client.describe_cluster( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4' ) print(response)
Note that the response to this operation only includes the
ZookeeperConnectStringTls
node in clusters created with Apache Kafka version 2.5.1 and later. -
Returns a description of the cluster operation specified by the Amazon Resource Name (ARN).
-
Returns a description of this MSK configuration.
-
Returns a description of this revision of the configuration.
-
Describes Remote VPC Connection.
-
A list of brokers that a client can use to bootstrap. This list doesn't necessarily include all of the brokers in the cluster. The following Python 3.6 example shows how you can use the Amazon Resource Name (ARN) of a cluster to get its bootstrap brokers. If you don't know the ARN of your cluster, you can use the
ListClusters
operation to get the ARNs of all the clusters in this account and Region.import boto3 client = boto3.client('kafka') response = client.get_bootstrap_brokers( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', ) print(response['BootstrapBrokerString'])
-
Get cluster policy.
-
Returns a list of the Apache Kafka versions to which you can update this cluster.
-
List client VPC connections.
-
Returns a list of all the operations that have been performed on the specified MSK cluster.
-
Returns a list of all the MSK clusters.
-
Returns a list of all the revisions of an MSK configuration.
-
Returns a list of all the MSK configurations.
-
Returns the Apache Kafka version objects.
-
Returns a list of the broker nodes in the cluster. The following Python 3.6 example first lists one node of a cluster. Because the cluster has more nodes, the response contains a token that the script then uses to list the remaining nodes.
import boto3 client = boto3.client('kafka') list_nodes_response = client.list_nodes( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', MaxResults=1 ) print('\n') print('Here is the first node in the list:') print('\n') print(list_nodes_response['NodeInfoList']) next_token = list_nodes_response['NextToken'] list_nodes_response = client.list_nodes( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', NextToken=next_token ) print('\n') print('Here are the remaining nodes in the list:') print('\n') print(list_nodes_response['NodeInfoList'])
-
Returns a list of SCRAM secrets associated with the cluster. SCRAM secrets are stored in the Amazon Secrets Manager service, and are used to authenticate clients using sign-in credentials.
-
Returns a list of the tags associated with the specified resource.
-
Lists all VPC connections.
-
Create or update cluster policy.
-
Reboots a broker. In a given cluster, you can reboot one broker at a time.
To reboot a broker, wait for the cluster status to be ACTIVE. This operation returns an error if you invoke it while the cluster status is HEALING. You must wait for the status to change from HEALING to ACTIVE before you reboot the broker.
-
Reject client VPC connection.
-
Adds tags to the specified MSK resource.
-
Removes the tags associated with the keys that are provided in the query.
-
Updates the number of broker nodes in the cluster. You can use this operation to increase or decrease the number of brokers in an existing cluster.
The following Python 3.6 example shows how you can increase the number of brokers in a cluster to 6 brokers. The update operation returns immediately, with a response that includes the Amazon Resource Name (ARN) that Amazon MSK assigns to this cluster operation. You can use that ARN to check the state of the operation. When the state changes from
PENDING
toUPDATE_COMPLETE
, the operation is complete.import boto3 import time client = boto3.client('kafka') update_broker_count_response = client.update_broker_count( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', CurrentVersion='K12V3IB1VIZHHY', TargetNumberOfBrokerNodes=6 ) operation_arn = update_broker_count_response['ClusterOperationArn'] print(operation_arn) describe_cluster_operation_response = client.describe_cluster_operation(ClusterOperationArn=operation_arn) operation_state = describe_cluster_operation_response['ClusterOperationInfo']['OperationState'] print(operation_state) expanded = False while not expanded: print('Sleeping for 15 seconds before checking to see if the cluster update is done...') time.sleep(15) describe_cluster_operation_response = client.describe_cluster_operation(ClusterOperationArn=operation_arn) operation_state = describe_cluster_operation_response['ClusterOperationInfo']['OperationState'] if 'UPDATE_COMPLETE' == operation_state: expanded = True print('The cluster has 6 brokers now.')
-
Updates the EBS storage associated with Amazon MSK brokers. You can increase the amount of EBS storage per broker. You can't decrease the storage. To increase storage, wait for the cluster to be in the ACTIVE state. Storage volumes remain available during this scaling-up operation.
-
For information about this operation, see Updating the broker type
in the developer guide. -
Updates the cluster with the configuration that is specified in the request body. Before you invoke this operation, ensure that the number of partitions per broker on your MSK cluster is under the limits described in Number of partitions per broker
. You can't update the configuration of an MSK cluster that exceeds these limits. -
Updates the cluster to the specified Apache Kafka version. Before you invoke this operation, ensure that the number of partitions per broker on your MSK cluster is under the limits described in Number of partitions per broker
. You can't update the Apache Kafka version for an MSK cluster that exceeds these limits. -
Creates a new revision of the cluster configuration. The configuration must be in the
ACTIVE
state. -
Updates the connectivity setting for the cluster.
-
Updates the monitoring settings for the cluster. You can use this operation to specify which Apache Kafka metrics you want Amazon MSK to send to Amazon CloudWatch. You can also specify settings for open monitoring with Prometheus. The following Python 3.6 example enables open monitoring with the Node Exporter. It also sets enhanced monitoring to
PER_BROKER
. For more information about monitoring, see Monitoring. import boto3 import time client = boto3.client('kafka') update_monitoring_response = client.update_monitoring( ClusterArn='arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4', CurrentVersion='K12V3IB1VIZHHY', EnhancedMonitoring='PER_BROKER', OpenMonitoring={"Prometheus":{"JmxExporter":{"EnabledInBroker":False},"NodeExporter":{"EnabledInBroker":True}}} ) operation_arn = update_monitoring_response['ClusterOperationArn'] print('The ARN of the update operation is ' + operation_arn) describe_cluster_operation_response = client.describe_cluster_operation(ClusterOperationArn=operation_arn) operation_state = describe_cluster_operation_response['ClusterOperationInfo']['OperationState'] print('The status of the update operation is ' + operation_state) updated = False while not updated: print('Sleeping for 15 seconds before checking to see if the monitoring update is done...') time.sleep(15) describe_cluster_operation_response = client.describe_cluster_operation(ClusterOperationArn=operation_arn) operation_state = describe_cluster_operation_response['ClusterOperationInfo']['OperationState'] if 'UPDATE_COMPLETE' == operation_state: updated = True print('You have successfully updated the monitoring settings.')