Apache Kafka - Amazon IoT Core
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Apache Kafka

Apache Kafka (Kafka) 操作将消息直接发送到你的 Apache Kafka Managed Streaming(亚马逊)、由第三方提供商(例如 Confluent Cloud)管理的 MSK Apache Kafka 集群或自我管理的 Apache Kafka 集群,或者自我管理的 Apache Kafka 集群。通过 Kafka 规则操作,您可以将您的 IoT 数据路由到 Kafka 集群。这使您能够出于各种目的构建高性能数据管道,例如流分析、数据集成、可视化和任务关键型业务应用程序等。

注意

本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka。 MSK不支持@@ 无服务器。 MSK无服务器集群只能通过IAM身份验证来完成,Apache Kafka 规则操作目前不支持身份验证。有关如何使用 Confluent Amazon IoT Core 进行配置的更多信息,请参阅利用 Confluent 和 Amazon 解决物联网设备和数据管理挑战。

要求

此规则操作具有以下要求:

  • Amazon IoT 可以承担执行ec2:CreateNetworkInterface、、、、ec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttribute、和ec2:DescribeSecurityGroups操作的IAM角色。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 授予 Amazon IoT 规则所需的访问权限

    在 Amazon IoT 控制台中,您可以选择或创建 Amazon IoT Core 允许执行此规则操作的角色。

    有关网络接口的更多信息,请参阅 Amazon EC2 用户指南中的弹性网络接口

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 如果您使用 Amazon Secrets Manager 存储连接到 Kafka 代理所需的凭证,则必须创建一个 Amazon IoT Core 可以代入执行secretsmanager:GetSecretValuesecretsmanager:DescribeSecret操作的IAM角色。

    附加到您指定的角色的策略应如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • 你可以在亚马逊虚拟私有云(亚马逊)中运行你的 Apache Kafka 集群。VPC您必须创建 Amazon VPC 目标并使用子NAT网中的网关将消息从 Amazon IoT 转发到公有 Kafka 集群。 Amazon IoT 规则引擎在VPC目标中列出的每个子网中创建一个网络接口,用于将流量直接路由到VPC。创建VPC目标时, Amazon IoT 规则引擎会自动创建VPC规则操作。有关VPC规则操作的更多信息,请参阅虚拟私有云 (VPC) 目的地

  • 如果您使用客户托管 Amazon KMS key (KMS密钥)对静态数据进行加密,则服务必须有权代表呼叫者使用该KMS密钥。有关更多信息,请参阅 Apache Managed Streaming for Apache Kafka 开发者指南中的亚马逊MSK加密

参数

使用此操作创建 Amazon IoT 规则时,必须指定以下信息:

destinationArn

VPC目的地的 Amazon 资源名称 (ARN)。有关创建VPC目的地的信息,请参阅虚拟私有云 (VPC) 目的地

topic

要发送到 Kafka 代理的消息的 Kafka 主题。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

键(可选)

Kafka 消息键。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

标头(可选)

您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从 IoT 客户端路由到下游 Kafka 集群,而无需修改消息有效负载。

您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板

注意

不支持二进制格式的标头。

分区(可选)

Kafka 消息分区。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

clientProperties

定义 Apache Kafka 生成器客户端属性的对象。

acks(可选)

生成器在考虑请求完成之前要求服务器收到的确认数。

如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。

有效值:-101all。默认值为 1

bootstrap.servers

主机和端口对列表(例如 host1:port1host2:port2)用于建立到 Kafka 集群的初始连接。

compression.type(可选)

生成器生成的所有数据的压缩类型。

有效值:nonegzipsnappylz4zstd。默认值为 none

security.protocol

用于连接到您的 Kafka 代理的安全协议。

有效值:SSLSASL_SSL。默认值为 SSL

key.serializer

指定如何将您使用 ProducerRecord 提供的键对象转换为字节。

有效值:StringSerializer

value.serializer

指定如何将您使用 ProducerRecord 提供的值对象转换为字节。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任库文件或位于 Amazon Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。

此字段支持替换模板。如果您使用 Secrets Manager 存储连接到 Kafka 代理所需的凭据,则可以使用该get_secretSQL函数来检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关该get_secretSQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用 SecretBinary 参数。如果信任库采用字符串的形式,请使用 SecretString 参数。

此值最大为 65 KB。

ssl.truststore.password

信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。

ssl.keystore

密钥库文件。当您指定 SSL 作为 security.protocol 的值时,才需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用get_secretSQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secretSQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

ssl.keystore.password

密钥库文件存储的密码。如果为 ssl.keystore 指定了值,则需要此值。

此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用get_secretSQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secretSQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

ssl.key.password

密钥库文件中私有密钥的密码。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用get_secretSQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secretSQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

sasl.mechanism

用于连接到您的 Kafka 代理的安全机制。当您为 security.protocol 指定 SASL_SSL 时,则需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west

sasl.plain.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.plain.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.scram.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.scram.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.kerberos.keytab

Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用get_secretSQL函数。有关替换模板的更多信息,请参阅 替换模板。有关该get_secretSQL函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

sasl.kerberos.service.name

Apache Kafka 运行的 Kerberos 主要名称。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.kdc

你的 Apache Kafka 生产者客户端所连接的密钥分发中心 (KDC) 的主机名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生成器客户端连接到的领域。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.principal

Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

示例

以下JSON示例在规则中定义了 Apache Kafka 操作。 Amazon IoT 以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka Action 标头中。

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

有关 Kerberos 设置的重要注意事项

  • 您的密钥分发中心 (KDC) 必须可通过目标VPC内的私有域名系统 (DNS) 进行解析。一种可能的方法是将KDCDNS条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域

  • 每个都VPC必须启用DNS分辨率。有关更多信息,请参阅DNS与您一起使用VPC

  • VPC目标中的网络接口安全组和实例级安全组必须允许来自您内部的流量通过以下VPC端口。

    • TCP引导代理侦听器端口(通常为 9092,但必须在 9000—9100 范围内)上的流量

    • TCP还有 88 号港口的UDP流量 KDC

  • SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west