本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Apache Kafka
Apache Kafka (Kafka) 操作将消息直接发送到你的 Apache Kafka Managed Streaming(亚马逊)、由第三方提供商(例如
注意
本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka。
要求
此规则操作具有以下要求:
-
Amazon IoT 可以承担执行
ec2:CreateNetworkInterface
、、、、ec2:DescribeNetworkInterfaces
、ec2:CreateNetworkInterfacePermission
ec2:DeleteNetworkInterface
ec2:DescribeSubnets
ec2:DescribeVpcs
ec2:DescribeVpcAttribute
、和ec2:DescribeSecurityGroups
操作的IAM角色。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 授予 Amazon IoT 规则所需的访问权限。在 Amazon IoT 控制台中,您可以选择或创建 Amazon IoT Core 允许执行此规则操作的角色。
有关网络接口的更多信息,请参阅 Amazon EC2 用户指南中的弹性网络接口。
附加到您指定的角色的策略应如以下示例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
如果您使用 Amazon Secrets Manager 存储连接到 Kafka 代理所需的凭证,则必须创建一个 Amazon IoT Core 可以代入执行
secretsmanager:GetSecretValue
和secretsmanager:DescribeSecret
操作的IAM角色。附加到您指定的角色的策略应如以下示例所示:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] } -
你可以在亚马逊虚拟私有云(亚马逊)中运行你的 Apache Kafka 集群。VPC您必须创建 Amazon VPC 目标并使用子NAT网中的网关将消息从 Amazon IoT 转发到公有 Kafka 集群。 Amazon IoT 规则引擎在VPC目标中列出的每个子网中创建一个网络接口,用于将流量直接路由到VPC。创建VPC目标时, Amazon IoT 规则引擎会自动创建VPC规则操作。有关VPC规则操作的更多信息,请参阅虚拟私有云 (VPC) 目的地。
-
如果您使用客户托管 Amazon KMS key (KMS密钥)对静态数据进行加密,则服务必须有权代表呼叫者使用该KMS密钥。有关更多信息,请参阅 Apache Managed Streaming for Apache Kafka 开发者指南中的亚马逊MSK加密。
参数
使用此操作创建 Amazon IoT 规则时,必须指定以下信息:
- destinationArn
-
VPC目的地的 Amazon 资源名称 (ARN)。有关创建VPC目的地的信息,请参阅虚拟私有云 (VPC) 目的地。
- topic
-
要发送到 Kafka 代理的消息的 Kafka 主题。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 键(可选)
-
Kafka 消息键。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- 标头(可选)
-
您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从 IoT 客户端路由到下游 Kafka 集群,而无需修改消息有效负载。
您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板。
注意
不支持二进制格式的标头。
- 分区(可选)
-
Kafka 消息分区。
您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板。
- clientProperties
-
定义 Apache Kafka 生成器客户端属性的对象。
- acks(可选)
-
生成器在考虑请求完成之前要求服务器收到的确认数。
如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。
有效值:
-1
、0
、1
、all
。默认值为1
。 - bootstrap.servers
-
主机和端口对列表(例如
host1:port1
、host2:port2
)用于建立到 Kafka 集群的初始连接。 - compression.type(可选)
-
生成器生成的所有数据的压缩类型。
有效值:
none
、gzip
、snappy
、lz4
、zstd
。默认值为none
。 - security.protocol
-
用于连接到您的 Kafka 代理的安全协议。
有效值:
SSL
、SASL_SSL
。默认值为SSL
。 - key.serializer
-
指定如何将您使用
ProducerRecord
提供的键对象转换为字节。有效值:
StringSerializer
。 - value.serializer
-
指定如何将您使用
ProducerRecord
提供的值对象转换为字节。有效值:
ByteBufferSerializer
。 - ssl.truststore
-
base64 格式的信任库文件或位于 Amazon Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。
此字段支持替换模板。如果您使用 Secrets Manager 存储连接到 Kafka 代理所需的凭据,则可以使用该
get_secret
SQL函数来检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关该get_secret
SQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用SecretBinary
参数。如果信任库采用字符串的形式,请使用SecretString
参数。此值最大为 65 KB。
- ssl.truststore.password
-
信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。
- ssl.keystore
-
密钥库文件。当您指定
SSL
作为security.protocol
的值时,才需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secret
SQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
参数。 - ssl.keystore.password
-
密钥库文件存储的密码。如果为
ssl.keystore
指定了值,则需要此值。此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secret
SQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString
参数。 - ssl.key.password
-
密钥库文件中私有密钥的密码。
此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secret
SQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretString
参数。 - sasl.mechanism
-
用于连接到您的 Kafka 代理的安全机制。当您为
security.protocol
指定SASL_SSL
时,则需要此值。有效值:
PLAIN
、SCRAM-SHA-512
、GSSAPI
。注意
SCRAM-SHA-512
是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west - sasl.plain.username
-
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定PLAIN
时,则需要此值。 - sasl.plain.password
-
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定PLAIN
时,则需要此值。 - sasl.scram.username
-
用于从 Secrets Manager 中检索密钥字符串的用户名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定SCRAM-SHA-512
时,则需要此值。 - sasl.scram.password
-
用于从 Secrets Manager 中检索密钥字符串的密码。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定SCRAM-SHA-512
时,则需要此值。 - sasl.kerberos.keytab
-
Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用
get_secret
SQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secret
SQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用SecretBinary
参数。 - sasl.kerberos.service.name
-
Apache Kafka 运行的 Kerberos 主要名称。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。 - sasl.kerberos.krb5.kdc
-
你的 Apache Kafka 生产者客户端所连接的密钥分发中心 (KDC) 的主机名。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。 - sasl.kerberos.krb5.realm
-
您的 Apache Kafka 生成器客户端连接到的领域。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。 - sasl.kerberos.principal
-
Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为
security.protocol
指定SASL_SSL
、为sasl.mechanism
指定GSSAPI
时,则需要此值。
示例
以下JSON示例在规则中定义了 Apache Kafka 操作。 Amazon IoT 以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka Action 标头中。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
有关 Kerberos 设置的重要注意事项
-
您的密钥分发中心 (KDC) 必须可通过目标VPC内的私有域名系统 (DNS) 进行解析。一种可能的方法是将KDCDNS条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域。
-
每个都VPC必须启用DNS分辨率。有关更多信息,请参阅DNS与您一起使用VPC。
-
VPC目标中的网络接口安全组和实例级安全组必须允许来自您内部的流量通过以下VPC端口。
-
TCP引导代理侦听器端口(通常为 9092,但必须在 9000—9100 范围内)上的流量
-
TCP还有 88 号港口的UDP流量 KDC
-
-
SCRAM-SHA-512
是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west