本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache Kafka
Apache Kafka(Kafka)操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Confluent Cloud
注意
本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka
要求
此规则操作具有以下要求:
-
一个 IAM 角色,Amazon IoT可以代入执行操作
ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfaces、ec2:CreateNetworkInterfacePermission、ec2:DeleteNetworkInterface、ec2:DescribeSubnets、ec2:DescribeVpcs、ec2:DescribeVpcAttribute、和ec2:DescribeSecurityGroups操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 授予Amazon IoT规则所需的访问权限。在Amazon IoT控制台中,您可以选择或创建Amazon IoT Core允许执行此规则操作的角色。
有关网络接口的更多信息,请参阅 Amazon EC2 用户指南中的弹性网络接口。
附加到您指定的角色的策略应如以下示例所示:
-
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
如果您使用Amazon Secrets Manager存储连接到 Kafka 代理所需的证书,则必须创建一个Amazon IoT Core可以代入执行
secretsmanager:GetSecretValue和secretsmanager:DescribeSecret操作的 IAM 角色。附加到您指定的角色的策略应如以下示例所示:
-
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*" ] } ] }
-
您可以在 Amazon Virtual Private Cloud(Amazon VPC)中运行您的 Apache Kafka 集群。您必须创建 Amazon VPC 目标并在子网中使用 NAT 网关将消息从Amazon IoT转发到公有 Kafka 集群。Amazon IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口,以将流量直接路由到 VPC。当您创建 VPC 目标时,Amazon IoT规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 虚拟私有云(VPC)目标。
-
如果您使用客户托管Amazon KMS key(KMS 密钥)加密静态数据,则服务必须有权代表呼叫者使用 KMS 密钥。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密。
Parameters
使用此操作创建Amazon IoT规则时,必须指定以下信息:
- destinationArn
-
VPC 目标的 Amazon 资源名称(ARN)。有关如何创建 VPC 目标的更多信息,请参阅 虚拟私有云(VPC)目标。
- topic
-
要发送到 Kafka 代理的消息的 Kafka 主题。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 键(可选)
-
Kafka 消息键。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 标头(可选)
-
您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从物联网客户端路由到下游 Kafka 集群,而无需修改消息有效载荷。
您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板。
注意
不支持二进制格式的标头。
- 分区(可选)
-
Kafka 消息分区。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- clientProperties
-
定义 Apache Kafka 生成器客户端属性的对象。
- acks(可选)
-
生成器在考虑请求完成之前要求服务器收到的确认数。
如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。
有效值:
-1、0、1、all。默认值为1。 - bootstrap.servers
-
主机和端口对列表(例如
host1:port1、host2:port2)用于建立到 Kafka 集群的初始连接。 - compression.type(可选)
-
生成器生成的所有数据的压缩类型。
有效值:
none、gzip、snappy、lz4、zstd。默认值为none。 - security.protocol
-
用于连接到您的 Kafka 代理的安全协议。
有效值:
SSL、SASL_SSL。默认值为SSL。 - key.serializer
-
指定如何将您使用
ProducerRecord提供的键对象转换为字节。有效值:
StringSerializer。 - value.serializer
-
指定如何将您使用
ProducerRecord提供的值对象转换为字节。有效值:
ByteBufferSerializer。 - ssl.truststore
-
base64 格式的信任库文件或位于 Amazon Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。
此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭证,则可以使用
get_secretSQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关get_secretSQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用SecretBinary参数。如果信任库采用字符串的形式,请使用SecretString参数。此值最大为 65 KB。
- ssl.truststore.password
-
信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。
- ssl.keystore
-
密钥库文件。当您指定
SSL作为security.protocol的值时,才需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secretSQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secretSQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary参数。 - ssl.keystore.password
-
密钥库文件存储的密码。如果为
ssl.keystore指定了值,则需要此值。此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secretSQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secretSQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString参数。 - ssl.key.password
-
密钥库文件中私有密钥的密码。
此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secretSQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secretSQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString参数。 - sasl.mechanism
-
用于连接到您的 Kafka 代理的安全机制。当您为
security.protocol指定SASL_SSL时,则需要此值。有效值:
PLAIN、SCRAM-SHA-512、GSSAPI。注意
SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west - sasl.plain.username
-
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定PLAIN时,则需要此值。 - sasl.plain.password
-
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定PLAIN时,则需要此值。 - sasl.scram.username
-
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定SCRAM-SHA-512时,则需要此值。 - sasl.scram.password
-
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定SCRAM-SHA-512时,则需要此值。 - sasl.kerberos.keytab
-
Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定GSSAPI时,则需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secretSQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关get_secretSQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary参数。 - sasl.kerberos.service.name
-
Apache Kafka 运行的 Kerberos 主要名称。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定GSSAPI时,则需要此值。 - sasl.kerberos.krb5.kdc
-
您的 Apache Kafka 生成器客户端连接到的密钥分配中心(KDC)的主机名。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定GSSAPI时,则需要此值。 - sasl.kerberos.krb5.realm
-
您的 Apache Kafka 生成器客户端连接到的领域。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定GSSAPI时,则需要此值。 - sasl.kerberos.principal
-
Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为
security.protocol指定SASL_SSL、为sasl.mechanism指定GSSAPI时,则需要此值。
示例
以下 JSON 示例在规则中定义了 Apache Kafka 操作。Amazon IoT以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka 操作标头中。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
有关 Kerberos 设置的重要注意事项
-
您的密钥分配中心(KDC)必须通过目标 VPC 内的私有域名系统(DNS)进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域。
-
每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用。
-
VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。
-
引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000–9100 范围内)
-
KDC 端口 88 上的 TCP 和 UDP 流量
-
-
SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west