本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
IAM 访问控制
Amazon MSK 的 IAM 访问控制让您能够处理 MSK 集群的身份验证和授权。这样就不需要使用一种身份验证机制和另一种授权机制。例如,当客户端尝试写入您的集群时,Amazon MSK 使用 IAM 来检查该客户端是否是经过身份验证的身份,以及是否有权向您的集群生成数据。IAM 访问控制适用于 Java 和非 Java 客户端,包括用 Python、 JavaScript Go 和.NET 编写的 Kafka 客户端。
Amazon MSK 会记录访问事件,以方便您进行审计。有关更多信息,请参阅使用 记录 Amazon CloudTrailAPI 调用。
为了能够进行 IAM 访问控制,Amazon MSK 对 Apache Kafka 源代码进行了少许修改。这些修改不会给您的 Apache Kafka 体验造成明显的影响。
重要
IAM 访问控制不适用于 Apache ZooKeeper 节点。有关如何控制对这些节点的访问权限的信息,请参阅 控制对 Apache 的访问权限 ZooKeeper。
重要
如果您的集群使用 IAM 访问控制,则 allow.everyone.if.no.acl.found
Apache Kafka 设置无效。
重要
您可以为使用 IAM 访问控制的 MSK 集群调用 Apache Kafka ACL API。但是,存储在 Apache 中的 Apache Kafka ACL 对 IAM 角色的授权 ZooKeeper 没有影响。您必须将 IAM 策略用于 IAM 角色的访问控制。
Amazon MSK 的 IAM 访问控制的工作原理
要使用 Amazon MSK 的 IAM 访问控制,请执行以下步骤,本节的其余部分将详细介绍这些步骤。
创建使用 IAM 访问控制的集群
本节介绍如何使用 Amazon Web Services Management Console、API 或创建使用 IAM 访问控制的集群。 Amazon CLI 有关如何为现有集群开启 IAM 访问控制的信息,请参阅 更新集群的安全设置。
使用创建 Amazon Web Services Management Console 使用 IAM 访问控制的集群
在 https://console.amazonaws.cn/msk/
打开 Amazon MSK 控制台。 -
选择创建集群。
-
选择使用自定义设置创建集群。
-
在身份验证部分中,选择 IAM 访问控制。
-
完成创建集群的其余工作流程。
使用 API 或创建 Amazon CLI 使用 IAM 访问控制的集群
要创建启用了 IAM 访问控制的集群,请使用 CreateClusterAPI 或 create-cluster CLI
命令,并传递以下 JSON 作为 ClientAuthentication
参数:。"ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } }
配置客户端以进行 IAM 访问控制
要让客户端能够与使用 IAM 访问控制的 MSK 集群通信,您可以使用以下任何一种机制:
-
使用 SASL_OAUTHBEARER 机制进行非 Java 客户端配置
-
使用 SASL_OAUTHBEARER 机制或 AWS_MSK_IAM 机制进行 Java 客户端配置
使用 SASL_OAUTHBEARER 机制配置 IAM
使用以下 Python Kafka 客户端示例中突出显示的语法作为指南,编辑 client.properties 配置文件。其他语言的配置更改与之类似。
#!/usr/bin/python3from kafka import KafkaProducer from kafka.errors import KafkaError import socket import time from aws_msk_iam_sasl_signer import MSKAuthTokenProvider class MSKTokenProvider(): def token(self): token, _ = MSKAuthTokenProvider.generate_auth_token('<my aws region>') return token tp = MSKTokenProvider() producer = KafkaProducer( bootstrap_servers='<my bootstrap string>', security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=tp, client_id=socket.gethostname(), ) topic = "<my-topic>" while True: try: inp=input(">") producer.send(topic, inp.encode()) producer.flush() print("Produced!") except Exception: print("Failed to send message:", e) producer.close()
下载所选配置语言的帮助程序库,然后按照该语言库主页上的 “开始使用” 部分中的说明进行操作。
JavaScript: https://github.com/aws/ aws-msk-iam-sasl-signer-js #getting-st
arted Python:https://github.com/aws/ aws-msk-iam-sasl-signer-python #get-
已开始 Go:https://github.com/aws/ aws-msk-iam-sasl-signer-go #getting-st
arted .NET:https://github.com/aws/ aws-msk-iam-sasl-signer-
net #getting-已开始 JAVA:Java 通过
aws-msk-iam-auth
jar 文件提供 SASL_OAUTHBEARER 支持
使用 MSK 自定义 AWS_MSK_IAM 机制配置 IAM
将以下内容添加到
client.properties
文件中。将<PATH_TO_TRUST_STORE_FILE>
替换为客户端上信任存储文件的完全限定路径。注意
如果您不想使用特定证书,可以从
client.properties
文件中删除ssl.truststore.location=
。如果您不指定<PATH_TO_TRUST_STORE_FILE>
ssl.truststore.location
的值,Java 进程将使用默认证书。ssl.truststore.location=
<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler要使用您为 Amazon 凭证创建的命名配置文件,请将其包含
awsProfileName="
在您的客户端配置文件中。有关命名配置文件的信息,请参阅文档中的命名配置 Amazon CLI 文件。your profile name
";下载最新的稳定版 aws-msk-iam-auth
JAR 文件,并将其放在类路径中。如果您使用 Maven,请添加以下依赖项,并根据需要调整版本号: <dependency> <groupId>software.amazon.msk</groupId> <artifactId>aws-msk-iam-auth</artifactId> <version>1.0.0</version> </dependency>
Amazon MSK 客户端插件在 Apache 2.0 许可证下是开源的。
创建授权策略
将授权策略附加到与客户端对应的 IAM 角色。在授权策略中,您可以指定角色允许或拒绝哪些操作。如果您的客户端位于 Amazon EC2 实例上,请将授权策略与该 Amazon EC2 实例的 IAM 角色关联。或者,您也可以将客户端配置为使用命名配置文件,然后将授权策略与该命名配置文件的角色关联。配置客户端以进行 IAM 访问控制 介绍如何将客户端配置为使用命名配置文件。
有关如何创建 IAM policy 的信息,请参阅创建 IAM policy。
以下是名为的集群的授权策略示例 MyTestCluster。要了解 Action
和 Resource
元素的语义,请参阅 操作和资源的语义。
重要
您对 IAM policy 所做的更改会立即反映在 IAM API 和 Amazon CLI 中。但是,策略更改可能需要很长时间才能生效。在大多数情况下,策略更改会在不到一分钟的时间内生效。网络状况有时可能会增加延迟时间。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*" ] } ] }
要了解如何创建包含与常见 Apache Kafka 用例(例如创建和使用数据)对应的操作元素的策略,请参阅 常见使用案例。
对于 Kafka 版本 2.8.0 及更高版本,该WriteDataIdempotently权限已被弃用 (KIP-679)。enable.idempotence = true
。因此,对于 Kafka 版本 2.8.0 及更高版本,IAM 不提供与 Kafka ACL 相同的功能。仅提供 WriteData
对某个主题的访问权限,无法 WriteDataIdempotently
到该主题。这不会影响将 WriteData
提供给所有主题的情况。在这种情况下,允许 WriteDataIdempotently
。这是由于 IAM 逻辑的实现与 Kafka ACL 的实现方式存在差异。
要解决这个问题,建议使用类似于以下示例的策略:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka-cluster:WriteDataIdempotently" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/TestTopic" ] } ] }
在这种情况下,WriteData
允许写入 TestTopic
,而 WriteDataIdempotently
允许对集群进行幂等性写入。请务必注意,WriteDataIdempotently
是集群级别的权限。它不能在主题级别使用。如果 WriteDataIdempotently
仅限于主题级别,则此策略将不起作用。
获取用于 IAM 访问控制的引导代理
操作和资源的语义
本部分解释了可以在 IAM 授权策略中使用的操作和资源元素的语义。有关策略示例,请参阅创建授权策略。
操作
下表列出了在使用 Amazon MSK 的 IAM 访问控制时可以在授权策略中包含的操作。当您在授权策略中包含表操作列中的操作时,还必须包含所需操作列中的相应操作。
操作 | 描述 | 所需的操作 | 所需的 资源 | 适用于无服务器集群 |
---|---|---|---|---|
kafka-cluster:Connect |
授予连接和验证集群的权限。 | 无 | cluster | 是 |
kafka-cluster:DescribeCluster |
授予描述集群各个方面的权限,相当于 Apache Kafka 的 DESCRIBE CLUSTER ACL。 |
|
cluster | 是 |
kafka-cluster:AlterCluster |
授予更改集群各个方面的权限,相当于 Apache Kafka 的 ALTER CLUSTER ACL。 |
|
cluster | 否 |
kafka-cluster:DescribeClusterDynamicConfiguration |
授予描述集群动态配置的权限,相当于 Apache Kafka 的 DESCRIBE_CONFIGS CLUSTER ACL。 |
|
cluster | 否 |
kafka-cluster:AlterClusterDynamicConfiguration |
授予更改集群动态配置的权限,相当于 Apache Kafka 的 ALTER_CONFIGS CLUSTER ACL。 |
|
cluster | 否 |
kafka-cluster:WriteDataIdempotently |
授予在集群上以幂等方式写入数据的权限,相当于 Apache Kafka 的 IDEMPOTENT_WRITE CLUSTER ACL。 |
|
cluster | 是 |
kafka-cluster:CreateTopic |
授予在集群上创建主题的权限,相当于 Apache Kafka 的 CREATE CLUSTER/TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:DescribeTopic |
授予描述集群上主题的权限,相当于 Apache Kafka 的 DESCRIBE TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:AlterTopic |
授予更改集群上主题的权限,相当于 Apache Kafka 的 ALTER TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:DeleteTopic |
授予删除集群上主题的权限,相当于 Apache Kafka 的 DELETE TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:DescribeTopicDynamicConfiguration |
授予描述集群上主题动态配置的权限,相当于 Apache Kafka 的 DESCRIBE_CONFIGS TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:AlterTopicDynamicConfiguration |
授予更改集群上主题动态配置的权限,相当于 Apache Kafka 的 ALTER_CONFIGS TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:ReadData |
授予从集群上主题中读取数据的权限,相当于 Apache Kafka 的 READ TOPIC ACL。 |
|
topic | 是 |
kafka-cluster:WriteData |
授予向集群上的主题写入数据的权限,相当于 Apache Kafka 的 WRITE TOPIC ACL |
|
topic | 是 |
kafka-cluster:DescribeGroup |
授予描述集群上群组的权限,相当于 Apache Kafka 的 DESCRIBE GROUP ACL。 |
|
组 | 是 |
kafka-cluster:AlterGroup |
授予加入集群上群组的权限,相当于 Apache Kafka 的 READ GROUP ACL。 |
|
组 | 是 |
kafka-cluster:DeleteGroup |
授予删除集群上群组的权限,相当于 Apache Kafka 的 DELETE GROUP ACL。 |
|
组 | 是 |
kafka-cluster:DescribeTransactionalId |
授予描述集群上事务 ID 的权限,相当于 Apache Kafka 的 DESCRIBE TRANSACTIONAL_ID ACL。 |
|
transactional-id | 是 |
kafka-cluster:AlterTransactionalId |
授予更改集群上事务 ID 的权限,相当于 Apache Kafka 的 WRITE TRANSACTIONAL_ID ACL。 |
|
transactional-id | 是 |
在冒号之后的操作中,您可以任意次数地使用星号 (*) 通配符。示例如下。
kafka-cluster:*Topic
代表kafka-cluster:CreateTopic
、kafka-cluster:DescribeTopic
、kafka-cluster:AlterTopic
和kafka-cluster:DeleteTopic
。它不包括kafka-cluster:DescribeTopicDynamicConfiguration
或kafka-cluster:AlterTopicDynamicConfiguration
。-
kafka-cluster:*
代表所有权限。
资源
下表显示了在使用 Amazon MSK 的 IAM 访问控制时可在授权策略中使用的四种资源。您可以使用 DescribeClusterAPI 或 d es
资源 | ARN 格式 |
---|---|
集群 | arn:aws:kafka:region :account-id :cluster/cluster-name /cluster-uuid |
主题 | arn:aws:kafka:region :account-id :topic/cluster-name /cluster-uuid /topic-name |
组 | arn:aws:kafka:region :account-id :group/cluster-name /cluster-uuid /group-name |
事务 ID | arn:aws:kafka:region :account-id :transactional-id/cluster-name /cluster-uuid /transactional-id |
您可以在 ARN 中 :cluster/
、:topic/
、:group/
和 :transactional-id/
之后的任意位置,任意次数地使用星号 (*) 通配符。以下是如何使用星号 (*) 通配符引用多个资源的部分示例:
-
arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*
:任何名为的集群中的所有主题 MyTestCluster,无论集群的 UUID 如何。 -
arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1/*_test
:集群中名称以 “_test” 结尾的所有主题,其名称为,UUID 为 abcd1234-0123-abcd-56 MyTestCluster 78-1234abcd-1。 arn:aws:kafka:us-east-1:0123456789012:transactional-id/MyTestCluster/*/5555abcd-1111-abcd-1234-abcd1234-1
:所有交易 ID 为 5555abcd-1111-abcd-1234-abcd-1234-1 的交易,涉及你账户中命名的集群的所有化身。 MyTestCluster 这意味着,如果您创建了一个名为 MyTestCluster的集群,然后将其删除,然后创建另一个同名集群,则可以使用此资源 ARN 在两个集群上表示相同的交易 ID。但是,无法访问已删除的集群。
常见使用案例
下表中的第一列显示了一些常见用例。要授权客户端执行给定用例,请在客户端的授权策略中包含该用例所需的操作,并将 Effect
设置为 Allow
。
有关 Amazon MSK 的 IAM 访问控制之所有操作的信息,请参阅 操作和资源的语义。
注意
默认情况下,操作将被拒绝。您必须明确允许要授权客户端执行的每个操作。
应用场景 | 所需的操作 |
---|---|
Admin |
|
创建主题 |
|
生成数据 |
|
使用数据 |
|
以幂等方式生成数据 |
|
以事务方式生成数据 |
|
描述集群的配置 |
|
更新集群的配置 |
|
描述主题的配置 |
|
更新主题的配置 |
|
更改主题 |
|