Apache Kafka - Amazon IoT Core
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Apache Kafka

Apache Kafka (Kafka) 操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或自行托管的 Apache Kafka 集群以进行数据分析和可视化。

注意

本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka。不@@ 支持 MSK 无服务器。MSK 无服务器集群只能通过 IAM 身份验证完成,而 Apache Kafka 规则操作目前不支持该身份验证。

要求

此规则操作具有以下要求:

  • 一个 IAM 角色, Amazon IoT 可以代入执行操作ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttribute、和ec2:DescribeSecurityGroups操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅授予 Amazon IoT 规则所需的访问权限

    在 Amazon IoT 控制台中,您可以选择或创建 Amazon IoT Core 允许执行此规则操作的角色。

    有关网络接口的更多信息,请参阅《Amazon EC2 用户指南》中的弹性网络接口

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 如果您使用 Amazon Secrets Manager 存储连接到 Kafka 代理所需的证书,则必须创建一个 Amazon IoT Core 可以代入执行secretsmanager:GetSecretValuesecretsmanager:DescribeSecret操作的 IAM 角色。

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • 您可以在 Amazon Virtual Private Cloud (Amazon VPC) 中运行您的 Apache Kafka 集群。您必须创建 Amazon VPC 目标并在子网中使用 NAT 网关将消息从 Amazon IoT 转发到公有 Kafka 集群。 Amazon IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口,以将流量直接路由到 VPC。当您创建 VPC 目标时, Amazon IoT 规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 Virtual Private Cloud (VPC) 目标

  • 如果您使用客户托管 Amazon KMS key (KMS 密钥)加密静态数据,则服务必须有权代表呼叫者使用 KMS 密钥。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密

参数

使用此操作创建 Amazon IoT 规则时,必须指定以下信息:

destinationArn

VPC 目标的 Amazon Resource Name (ARN)。有关如何创建 VPC 目标的更多信息,请参阅 Virtual Private Cloud (VPC) 目标

topic

要发送到 Kafka 代理的消息的 Kafka 主题。

您可以使用替代模板替换此字段。有关更多信息,请参阅替换模板

键(可选)

Kafka 消息键。

您可以使用替代模板替换此字段。有关更多信息,请参阅替换模板

标头(可选)

您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从 IoT 客户端路由到下游 Kafka 集群,而无需修改消息有效负载。

您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅替换模板

注意

不支持二进制格式的标头。

分区(可选)

Kafka 消息分区。

您可以使用替代模板替换此字段。有关更多信息,请参阅替换模板

clientProperties

定义 Apache Kafka 生成器客户端属性的对象。

acks(可选)

生成器在考虑请求完成之前要求服务器收到的确认数。

如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。

有效值:-101all。默认值为 1

bootstrap.servers

主机和端口对列表(例如 host1:port1host2:port2)用于建立到 Kafka 集群的初始连接。

compression.type(可选)

生成器生成的所有数据的压缩类型。

有效值:nonegzipsnappylz4zstd。默认值为 none

security.protocol

用于连接到您的 Kafka 代理的安全协议。

有效值:SSLSASL_SSL。默认值为 SSL

key.serializer

指定如何将您使用 ProducerRecord 提供的键对象转换为字节。

有效值:StringSerializer

value.serializer

指定如何将您使用 ProducerRecord 提供的值对象转换为字节。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任库文件或位于 Amazon Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。

此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭证,则可以使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用 SecretBinary 参数。如果信任库采用字符串的形式,请使用 SecretString 参数。

此值最大为 65 KB。

ssl.truststore.password

信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。

ssl.keystore

密钥库文件。当您指定 SSL 作为 security.protocol 的值时,才需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

ssl.keystore.password

密钥库文件存储的密码。如果为 ssl.keystore 指定了值,则需要此值。

此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

ssl.key.password

密钥库文件中私有密钥的密码。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

sasl.mechanism

用于连接到您的 Kafka 代理的安全机制。当您为 security.protocol 指定 SASL_SSL 时,则需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west

sasl.plain.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.plain.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.scram.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.scram.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.kerberos.keytab

Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

sasl.kerberos.service.name

Apache Kafka 运行的 Kerberos 主要名称。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.kdc

您的 Apache Kafka 生成器客户端连接到的密钥分配中心 (KDC) 的主机名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生成器客户端连接到的领域。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.principal

Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

示例

以下 JSON 示例在规则中定义了 Apache Kafka 操作。 Amazon IoT 以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka 操作标头中。

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

有关 Kerberos 设置的重要注意事项

  • 您的密钥分配中心 (KDC) 必须通过目标 VPC 内的私有域名系统 (DNS) 进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域

  • 每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用

  • VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。

    • 引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000–9100 范围内)

    • KDC 端口 88 上的 TCP 和 UDP 流量

  • SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west