Kinesis Data Firehose - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Data Firehose

Kinesis Data Firehose 组件 (aws.greengrass.KinesisFirehose) 通过 Amazon Kinesis Data Firehose 传输流将数据发布到目标,例如 Amazon S3 Redshift 和 Amazon Elasticsearch Service 。有关更多信息,请参阅 Amazon Kinesis Data Firehose 开发人员指南》中的“什么是 Amazon Kinesis Data Firehose

要使用此组件发布到 Kinesis 传输流,请将消息发布到此组件订阅的主题。默认情况下,此组件订阅 kinesisfirehose/messagekinesisfirehose/message/binary/#本地发布/订阅主题。您可以在部署此组件时指定其他主题,包括 AWS AWS IoT Core MQTT 主题。

注意

此组件提供了与 AWS AWS IoT Greengrass V1 中的 Kinesis Data Firehose 连接器类似的功能。有关更多信息,请参阅AWS IoT IoT Greengrass V1 开发人员指南》中的“Kinesis Data Firehose 连接器”。

此组件具有以下版本:

  • 2.0.x

Requirements

此组件具有以下要求:

  • 您的核心设备必须满足 要求才能运行 Lambda 函数。如果您希望核心设备运行容器化 Lambda 函数,则设备必须满足执行此操作的要求。有关更多信息,请参阅运行 Lambda 函数的要求

  • 安装在核心设备上并添加到 PATH 环境变量中的 Python 版本 3.7。

  • Greengrass 设备角色必须允许 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例 IAM 策略所示。

    { "Version": "2012-10-17", "Statement": [ { "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect": "Allow", "Resource": [ "aws:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    您可以动态覆盖此组件的输入消息负载中的默认传输流。如果您的应用程序使用此功能,则 IAM 策略必须包含所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符 * 命名方案)。

  • 要从此组件接收输出数据,您必须在部署此组件时合并传统订阅路由器组件的以下配置更新。传统订阅路由器组件 (aws.greengrass.LegacySubscriptionRouter) 是此组件的依赖项。此配置指定此组件在其中发布响应的主题。

    • Replace region 替换为您使用的 AWS 区域。

    • Replace version 替换为此组件运行的 Lambda 函数的版本。要查找 Lambda 函数版本,您必须查看要部署的此组件的版本的配方。在 AWS AWS IoT Greengrass 控制台中打开此组件的详细信息页面,然后查找 Lambda 函数键值对。此键/值对包含 Lambda 函数的名称和版本。

    { "subscriptions": { "aws-greengrass-kinesisfirehose": { "id": "aws-greengrass-kinesisfirehose", "source": "aws:aws:lambda:region:aws-greengrass-kinesisfirehose:version", "subject": "kinesisfirehose/message/status", "target": "cloud" } } }
    重要

    每次部署此组件时,您必须在旧订阅路由器上更新 Lambda 函数版本。这可确保对部署的组件版本使用正确的 Lambda 函数版本。

    有关更多信息,请参阅创建部署

Configuration

此组件具有以下版本:

注意

此组件的默认配置包括 Lambda 函数参数。我们建议您仅编辑以下参数以在设备上配置此组件。

lambdaParams

包含此组件的 Lambda 函数的参数的对象。此对象包含以下信息:

EnvironmentVariables

包含 Lambda 函数参数的对象。此对象包含以下信息:

DEFAULT_DELIVERY_STREAM_ARN

组件在其中发送数据的默认 Kinesis Data Firehose 传输流的 ARN。您可以使用输入消息负载中的 delivery_stream_arn 属性覆盖目标流。

注意

核心设备角色必须允许对所有目标传输流执行所需的操作。有关更多信息,请参阅Requirements

PUBLISH_INTERVAL

(可选)组件将批处理数据发布到 Kinesis Data Firehose 之前等待的最大秒数。要将组件配置为一收到指标便发布,这意味着不进行批处理,请指定 0

此值最多为 900 秒。

默认值:10 秒

DELIVERY_STREAM_QUEUE_SIZE

(可选)在组件拒绝同一传输流的新记录之前,在内存中保留的最大记录数。

此值必须至少为 2000 条记录。

默认值:5000 条记录

containerMode

(可选)此组件的容器化模式。从以下选项中进行选择:

  • NoContainer – 组件未在隔离的运行时环境中运行。

  • GreengrassContainer – 组件在 AWS AWS IoT Greengrass 容器内的隔离运行时环境中运行。

默认值: GreengrassContainer

containerParams

(可选)包含此组件的容器参数的对象。如果您GreengrassContainer为 指定 ,则 组件将使用这些参数containerMode

此对象包含以下信息:

memorySize

(可选) 要分配给组件的内存量(以 KB 为单位)。

默认值为 64 MB (65,535 KB)。

pubsubTopics

(可选)一个 对象,其中包含组件在其中订阅接收消息的主题。您可以指定每个主题以及组件是订阅 AWS AWS IoT Core 中的 MQTT 主题还是本地发布/订阅主题。

此对象包含以下信息:

0 – 这是字符串形式的数组索引。

包含以下信息的对象:

type

(可选)此组件用于订阅消息的发布/订阅消息的类型。从以下选项中进行选择:

  • Pubsub – 订阅本地发布/订阅消息。如果选择此选项,则主题不能包含 MQTT 通配符。有关如何在指定此选项时从自定义组件发送消息的更多信息,请参阅发布/订阅本地消息

  • IotCore – 订阅 AWS IoT Core MQTT 消息。如果选择此选项,则主题可以包含 MQTT 通配符。有关如何在指定此选项时从自定义组件发送消息的更多信息,请参阅发布/订阅 AWS IoT Core MQTT 消息

默认值: Pubsub

topic

(可选) 组件订阅以接收消息的主题。如果您IotCore为 指定 type,则可以在本主题+中使用 MQTT 通配符(# 和 )。

例 示例:配置合并更新(容器模式)

{ "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:us-west-2:123456789012:deliverystream/mystream" } }, "containerMode": "GreengrassContainer" }

例 示例:配置合并更新(无容器模式)

{ "lambdaExecutionParameters": { "EnvironmentVariables": { "DEFAULT_DELIVERY_STREAM_ARN": "arn:aws:firehose:us-west-2:123456789012:deliverystream/mystream" } }, "containerMode": "NoContainer" }

输入数据

此组件接受有关以下主题的流内容,并将该内容发送到目标传输流。组件接受两种类型的输入数据:

  • JSON 数据,位于 kinesisfirehose/message 主题中。

  • 二进制数据,位于 kinesisfirehose/message/binary/# 主题中。

JSON 数据的默认主题(本地发布/订阅): kinesisfirehose/message

消息接受以下属性。输入消息必须采用 JSON 格式。

request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

类型object:包含以下信息的 :

data

要发送到传输流的数据。

类型: string

delivery_stream_arn

(可选)目标 Kinesis Data Firehose 传输流的 ARN。指定该属性可覆盖默认传输流。

类型: string

id

请求的任意 ID。使用此属性可将输入请求映射到输出响应。当您指定该属性时, 组件会将响应对象中的 id 属性设置为此值。

类型: string

例 示例输入

{ "request": { "delivery_stream_arn": "aws:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }
二进制数据(本地发布/订阅)的默认主题: kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。组件不解析二进制数据。组件按原样流式传输数据。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾部斜杠 (/)。

输出数据

默认情况下,此组件将响应作为输出数据发布到以下 MQTT 主题上。您必须将此主题指定为subject传统订阅路由器组件的配置中的 。有关如何在自定义组件中订阅本主题上的消息的更多信息,请参阅发布/订阅 AWS IoT Core MQTT 消息

默认主题 ( AWS IoT Core MQTT): kinesisfirehose/message/status

例 输出示例

响应包含批处理中发送的每个数据记录的状态。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注意

如果组件检测到可重试的错误(例如连接错误),它将重试下一个批次中的发布。

Licenses

此组件包括以下第三方软件/许可:

此组件是根据 Greengrass 核心软件许可协议发布的。

另请参阅