IAM access control - Amazon Managed Streaming for Apache Kafka
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

IAM access control

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster. IAM access control works for Java and non-Java clients, including Kafka clients written in Python, Go, JavaScript, and .NET.

Amazon MSK logs access events so you can audit them. For more information, see Logging API calls with Amazon CloudTrail.

To make IAM access control possible, Amazon MSK makes minor modifications to Apache Kafka source code. These modifications won't cause a noticeable difference in your Apache Kafka experience.

Important

IAM access control doesn't apply to Apache ZooKeeper nodes. For information about how you can control access to those nodes, see Controlling access to Apache ZooKeeper.

Important

The allow.everyone.if.no.acl.found Apache Kafka setting has no effect if your cluster uses IAM access control.

Important

You can invoke Apache Kafka ACL APIs for an MSK cluster that uses IAM access control. However, Apache Kafka ACLs stored in Apache ZooKeeper have no effect on authorization for IAM roles. You must use IAM policies to control access for IAM roles.

How IAM access control for Amazon MSK works

To use IAM access control for Amazon MSK, perform the following steps, which are described in detail in the rest of this section.

Create a cluster that uses IAM access control

This section explains how you can use the Amazon Web Services Management Console, the API, or the Amazon CLI to create a cluster that uses IAM access control. For information about how to turn on IAM access control for an existing cluster, see Updating security settings of a cluster.

Use the Amazon Web Services Management Console to create a cluster that uses IAM access control
  1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

  2. Choose Create cluster.

  3. Choose Create cluster with custom settings.

  4. In the Authentication section, choose IAM access control.

  5. Complete the rest of the workflow for creating a cluster.

Use the API or the Amazon CLI to create a cluster that uses IAM access control
  • To create a cluster with IAM access control enabled, use the CreateCluster API or the create-cluster CLI command, and pass the following JSON for the ClientAuthentication parameter: "ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } }.

Configure clients for IAM access control

To enable clients to communicate with an MSK cluster that uses IAM access control, you can use either of these mechanisms:

  • Non-Java client configuration using SASL_OAUTHBEARER mechanism

  • Java client configuration using SASL_OAUTHBEARER mechanism or AWS_MSK_IAM mechanism

Using the SASL_OAUTHBEARER mechanism to configure IAM

  1. Edit your client.properties configuration file using the highlighted syntax in the example Python Kafka client below as a guide. Configuration changes are similar in other languages.

    #!/usr/bin/python3from kafka import KafkaProducer from kafka.errors import KafkaError import socket import time from aws_msk_iam_sasl_signer import MSKAuthTokenProvider class MSKTokenProvider(): def token(self): token, _ = MSKAuthTokenProvider.generate_auth_token('<my aws region>') return token tp = MSKTokenProvider() producer = KafkaProducer( bootstrap_servers='<my bootstrap string>', security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=tp, client_id=socket.gethostname(), ) topic = "<my-topic>" while True: try: inp=input(">") producer.send(topic, inp.encode()) producer.flush() print("Produced!") except Exception: print("Failed to send message:", e) producer.close()
  2. Download the helper library for your chosen configuration language and follow the instructions in the Getting started section on that language library’s homepage.

Using the MSK custom AWS_MSK_IAM mechanism to configure IAM

  1. Add the following to the client.properties file. Replace <PATH_TO_TRUST_STORE_FILE> with the fully-qualified path to the trust store file on the client.

    Note

    If you don't want to use a specific certificate, you can remove ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> from your client.properties file. When you don't specify a value for ssl.truststore.location, the Java process uses the default certificate.

    ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE> security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

    To use a named profile that you created for Amazon credentials, include awsProfileName="your profile name"; in your client configuration file. For information about named profiles, see Named profiles in the Amazon CLI documentation.

  2. Download the latest stable aws-msk-iam-auth JAR file, and place it in the class path. If you use Maven, add the following dependency, adjusting the version number as needed:

    <dependency> <groupId>software.amazon.msk</groupId> <artifactId>aws-msk-iam-auth</artifactId> <version>1.0.0</version> </dependency>

The Amazon MSK client plugin is open-sourced under the Apache 2.0 license.

Create authorization policies

Attach an authorization policy to the IAM role that corresponds to the client. In an authorization policy, you specify which actions to allow or deny for the role. If your client is on an Amazon EC2 instance, associate the authorization policy with the IAM role for that Amazon EC2 instance. Alternatively, you can configure your client to use a named profile, and then you associate the authorization policy with the role for that named profile. Configure clients for IAM access control describes how to configure a client to use a named profile.

For information about how to create an IAM policy, see Creating IAM policies.

The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the Action and Resource elements, see Semantics of actions and resources.

Important

Changes that you make to an IAM policy are reflected in the IAM APIs and the Amazon CLI immediately. However, it can take noticeable time for the policy change to take effect. In most cases, policy changes take effect in less than a minute. Network conditions may sometimes increase the delay.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*" ] } ] }

To learn how to create a policy with action elements that correspond to common Apache Kafka use cases, like producing and consuming data, see Common use cases.

For Kafka versions 2.8.0 and above, the WriteDataIdempotently permission is deprecated (KIP-679). By default,enable.idempotence = true is set. Therefore, for Kafka versions 2.8.0 and above, IAM does not offer the same functionality as Kafka ACLs. It is not possible to WriteDataIdempotently to a topic by only providing WriteData access to that topic. This does not affect the case when WriteData is provided to ALL topics. In that case, WriteDataIdempotently is allowed. This is due to differences in implementation of IAM logic versus how the Kafka ACLs are implemented.

To work around this, we recommend using a policy similar to the sample below:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka-cluster:WriteDataIdempotently" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/TestTopic" ] } ] }

In this case, WriteData allows writes to TestTopic, while WriteDataIdempotently allows idempotent writes to the cluster. It is important to note that WriteDataIdempotently is a cluster level permission. It cannot be used at the topic level. If WriteDataIdempotently is restricted to the topic level, this policy will not work.

Get the bootstrap brokers for IAM access control

See Getting the bootstrap brokers for an Amazon MSK cluster.

Semantics of actions and resources

This section explains the semantics of the action and resource elements that you can use in an IAM authorization policy. For an example policy, see Create authorization policies.

Actions

The following table lists the actions that you can include in an authorization policy when you use IAM access control for Amazon MSK. When you include in your authorization policy an action from the Action column of the table, you must also include the corresponding actions from the Required actions column.

Action Description Required actions Required resources Applicable to serverless clusters
kafka-cluster:Connect Grants permission to connect and authenticate to the cluster. None cluster Yes
kafka-cluster:DescribeCluster Grants permission to describe various aspects of the cluster, equivalent to Apache Kafka's DESCRIBE CLUSTER ACL.

kafka-cluster:Connect

cluster Yes
kafka-cluster:AlterCluster Grants permission to alter various aspects of the cluster, equivalent to Apache Kafka's ALTER CLUSTER ACL.

kafka-cluster:Connect

kafka-cluster:DescribeCluster

cluster No
kafka-cluster:DescribeClusterDynamicConfiguration Grants permission to describe the dynamic configuration of a cluster, equivalent to Apache Kafka's DESCRIBE_CONFIGS CLUSTER ACL.

kafka-cluster:Connect

cluster No
kafka-cluster:AlterClusterDynamicConfiguration Grants permission to alter the dynamic configuration of a cluster, equivalent to Apache Kafka's ALTER_CONFIGS CLUSTER ACL.

kafka-cluster:Connect

kafka-cluster:DescribeClusterDynamicConfiguration

cluster No
kafka-cluster:WriteDataIdempotently Grants permission to write data idempotently on a cluster, equivalent to Apache Kafka's IDEMPOTENT_WRITE CLUSTER ACL.

kafka-cluster:Connect

kafka-cluster:WriteData

cluster Yes
kafka-cluster:CreateTopic Grants permission to create topics on a cluster, equivalent to Apache Kafka's CREATE CLUSTER/TOPIC ACL.

kafka-cluster:Connect

topic Yes
kafka-cluster:DescribeTopic Grants permission to describe topics on a cluster, equivalent to Apache Kafka's DESCRIBE TOPIC ACL.

kafka-cluster:Connect

topic Yes
kafka-cluster:AlterTopic Grants permission to alter topics on a cluster, equivalent to Apache Kafka's ALTER TOPIC ACL.

kafka-cluster:Connect

kafka-cluster:DescribeTopic

topic Yes
kafka-cluster:DeleteTopic Grants permission to delete topics on a cluster, equivalent to Apache Kafka's DELETE TOPIC ACL.

kafka-cluster:Connect

kafka-cluster:DescribeTopic

topic Yes
kafka-cluster:DescribeTopicDynamicConfiguration Grants permission to describe the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's DESCRIBE_CONFIGS TOPIC ACL.

kafka-cluster:Connect

topic Yes
kafka-cluster:AlterTopicDynamicConfiguration Grants permission to alter the dynamic configuration of topics on a cluster, equivalent to Apache Kafka's ALTER_CONFIGS TOPIC ACL.

kafka-cluster:Connect

kafka-cluster:DescribeTopicDynamicConfiguration

topic Yes
kafka-cluster:ReadData Grants permission to read data from topics on a cluster, equivalent to Apache Kafka's READ TOPIC ACL.

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:AlterGroup

topic Yes
kafka-cluster:WriteData Grants permission to write data to topics on a cluster, equivalent to Apache Kafka's WRITE TOPIC ACL

kafka-cluster:Connect

kafka-cluster:DescribeTopic

topic Yes
kafka-cluster:DescribeGroup Grants permission to describe groups on a cluster, equivalent to Apache Kafka's DESCRIBE GROUP ACL.

kafka-cluster:Connect

group Yes
kafka-cluster:AlterGroup Grants permission to join groups on a cluster, equivalent to Apache Kafka's READ GROUP ACL.

kafka-cluster:Connect

kafka-cluster:DescribeGroup

group Yes
kafka-cluster:DeleteGroup Grants permission to delete groups on a cluster, equivalent to Apache Kafka's DELETE GROUP ACL.

kafka-cluster:Connect

kafka-cluster:DescribeGroup

group Yes
kafka-cluster:DescribeTransactionalId Grants permission to describe transactional IDs on a cluster, equivalent to Apache Kafka's DESCRIBE TRANSACTIONAL_ID ACL.

kafka-cluster:Connect

transactional-id Yes
kafka-cluster:AlterTransactionalId Grants permission to alter transactional IDs on a cluster, equivalent to Apache Kafka's WRITE TRANSACTIONAL_ID ACL.

kafka-cluster:Connect

kafka-cluster:DescribeTransactionalId

kafka-cluster:WriteData

transactional-id Yes

You can use the asterisk (*) wildcard any number of times in an action after the colon. The following are examples.

  • kafka-cluster:*Topic stands for kafka-cluster:CreateTopic, kafka-cluster:DescribeTopic, kafka-cluster:AlterTopic, and kafka-cluster:DeleteTopic. It doesn't include kafka-cluster:DescribeTopicDynamicConfiguration or kafka-cluster:AlterTopicDynamicConfiguration.

  • kafka-cluster:* stands for all permissions.

Resources

The following table shows the four types of resources that you can use in an authorization policy when you use IAM access control for Amazon MSK. You can get the cluster Amazon Resource Name (ARN) from the Amazon Web Services Management Console or by using the DescribeCluster API or the describe-cluster Amazon CLI command. You can then use the cluster ARN to construct topic, group, and transactional ID ARNs. To specify a resource in an authorization policy, use that resource's ARN.

Resource ARN format
Cluster arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid
Topic arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name
Group arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/group-name
Transactional ID arn:aws:kafka:region:account-id:transactional-id/cluster-name/cluster-uuid/transactional-id

You can use the asterisk (*) wildcard any number of times anywhere in the part of the ARN that comes after :cluster/, :topic/, :group/, and :transactional-id/. The following are some examples of how you can use the asterisk (*) wildcard to refer to multiple resources:

  • arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*: all the topics in any cluster named MyTestCluster, regardless of the cluster's UUID.

  • arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*_test: all topics whose name ends with "_test" in the cluster whose name is MyTestCluster and whose UUID is abcd1234-0123-abcd-5678-1234abcd-1.

  • arn:aws:kafka:us-east-1:0123456789012:transactional-id/MyTestCluster/*/5555abcd-1111-abcd-1234-abcd1234-1: all transactions whose transactional ID is 5555abcd-1111-abcd-1234-abcd1234-1, across all incarnations of a cluster named MyTestCluster in your account. This means that if you create a cluster named MyTestCluster, then delete it, and then create another cluster by the same name, you can use this resource ARN to represent the same transactions ID on both clusters. However, the deleted cluster isn't accessible.

Common use cases

The first column in the following table shows some common use cases. To authorize a client to carry out a given use case, include the required actions for that use case in the client's authorization policy, and set Effect to Allow.

For information about all the actions that are part of IAM access control for Amazon MSK, see Semantics of actions and resources.

Note

Actions are denied by default. You must explicitly allow every action that you want to authorize the client to perform.

Use case Required actions
Admin

kafka-cluster:*

Create a topic

kafka-cluster:Connect

kafka-cluster:CreateTopic

Produce data

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:WriteData

Consume data

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:DescribeGroup

kafka-cluster:AlterGroup

kafka-cluster:ReadData

Produce data idempotently

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:WriteData

kafka-cluster:WriteDataIdempotently

Produce data transactionally

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:WriteData

kafka-cluster:DescribeTransactionalId

kafka-cluster:AlterTransactionalId

Describe the configuration of a cluster

kafka-cluster:Connect

kafka-cluster:DescribeClusterDynamicConfiguration

Update the configuration of a cluster

kafka-cluster:Connect

kafka-cluster:DescribeClusterDynamicConfiguration

kafka-cluster:AlterClusterDynamicConfiguration

Describe the configuration of a topic

kafka-cluster:Connect

kafka-cluster:DescribeTopicDynamicConfiguration

Update the configuration of a topic

kafka-cluster:Connect

kafka-cluster:DescribeTopicDynamicConfiguration

kafka-cluster:AlterTopicDynamicConfiguration

Alter a topic

kafka-cluster:Connect

kafka-cluster:DescribeTopic

kafka-cluster:AlterTopic