Apache Kafka - Amazon IoT Core
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Apache Kafka

Apache Kafka (Kafka) 操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 或自行托管的 Apache Kafka 集群以进行数据分析和可视化。

注意

本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka

Requirements

此规则操作具有以下要求:

  • Amazon IoT 可以承担的 IAM 角色,以执行 ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttribute 操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 授予 Amazon IoT 所需的访问权限

    在 Amazon IoT 控制台中,您可以选择或创建一个角色以允许 Amazon IoT Core 执行此规则操作。

    有关网络接口的更多信息,请参阅 Amazon EC2 用户指南中的弹性网络接口

    附加到您指定角色的策略应如下所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute" ], "Resource": "*" } ] }
  • 如果您使用 Amazon Secrets Manager 来存储所需的凭证以连接到 Kafka 代理,则必须创建一个 IAM 角色,以便 Amazon IoT Core 可以承担该角色来执行 secretsmanager:GetSecretValuesecretsmanager:DescribeSecret 操作。

    附加到您指定角色的策略应如下所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • 您必须创建 Virtual Private Cloud (VPC) 目标。(您可以在 Amazon Virtual Private Cloud 中运行您的 Apache Kafka 集群。) Amazon IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口。这将让规则引擎将流量直接路由到 VPC。当您创建 VPC 目标时,Amazon IoT 规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 VPC

  • 如果您使用客户托管的 Amazon Key Management Service (Amazon KMS) 客户主密钥 (CMK) 对数据进行静态加密,则服务必须具有代表调用方使用 CMK 的权限。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密

Parameters

使用此操作创建 Amazon IoT 规则时,您必须指定以下信息:

destinationArn

VPC 目标的 Amazon Resource Name (ARN)。有关如何创建 VPC 目标的更多信息,请参阅 VPC

topic

要发送到 Kafka 代理的消息的 Kafka 主题。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

键(可选)

Kafka 消息键。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

分区(可选)

Kafka 消息分区。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

clientProperties

定义 Apache Kafka 生成器客户端属性的对象。

acks(可选)

生成器在考虑请求完成之前要求服务器收到的确认数。

如果指定 0 作为值,则生成器将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。

有效值:0、1 默认值是 1。

bootstrap.servers

主机和端口对列表(host1:port1host2:port2 等)用于建立到您的 Kafka 集群的初始连接。

compression.type(可选)

生成器生成的所有数据的压缩类型。

有效值:nonegzipsnappylz4zstd。默认值为 none

security.protocol

用于连接到您的 Kafka 代理的安全协议。

有效值:SSLSASL_SSL。默认值为 SSL

key.serializer

指定如何将您使用 ProducerRecord 提供的键对象转换为字节。

有效值:StringSerializer

value.serializer

指定如何将您使用 ProducerRecord 提供的值对象转换为字节。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任库文件或位于 Amazon Secrets Manager 的信任库文件位置。如果您的信任库受到 Amazon 证书授权机构 (CA) 的认证,则不需要此值。

此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭据,则可以使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅 get_secret(secretId, secretType, key, roleArn) 。如果信任库采用文件的形式,请使用 SecretBinary 参数。如果信任库采用字符串的形式,请使用 SecretString 参数。

此值最大为 65 KB。

ssl.truststore.password

信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。

ssl.keystore

密钥库文件。当您指定 SSL 作为 security.protocol 的值时,才需要此值。

此字段支持替换模板。您必须使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭据。使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅 get_secret(secretId, secretType, key, roleArn) 。使用 SecretBinary 参数。

ssl.keystore.password

密钥库文件存储的密码。如果为 ssl.keystore 指定了值,则需要此值。

此字段的值可以是纯文本。此字段还支持替代模板。您必须使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭据。使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn) 。使用 SecretString 参数。

ssl.key.password

密钥库文件中私有密钥的密码。

此字段支持替换模板。您必须使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭据。使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅 get_secret(secretId, secretType, key, roleArn) 。使用 SecretString 参数。

sasl.mechanism

用于连接到您的 Kafka 代理的安全机制。当您为 security.protocol 指定 SASL_SSL 时,则需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512 是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。

sasl.plain.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.plain.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.scram.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.scram.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.kerberos.keytab

Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

此字段支持替换模板。您必须使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭据。使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn) 。使用 SecretBinary 参数。

sasl.kerberos.service.name

Apache Kafka 运行的 Kerberos 主要名称。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.kdc

您的 Apache Kafka 生成器客户端连接到的密钥分配中心 (KDC) 的主机名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生成器客户端连接到的领域。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.principal

Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

Examples

下面的 JSON 示例介绍了如何在 Amazon IoT 规则中定义 Apache Kafka 操作:

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab', 'SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" } } } ] }

有关 Kerberos 设置的重要注意事项

  • 您的密钥分配中心 (KDC) 必须通过目标 VPC 内的私有域名系统 (DNS) 进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域

  • 每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用

  • VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。

    • 引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000-9100 范围内)

    • KDC 端口 88 上的 TCP 和 UDP 流量

  • SCRAM-SHA-512 是 cn-north-1、cn-north-1、cn-northwest-1、us-gov-east-1 和 us-gov-west-1 区域中唯一支持的安全机制。