Kinesis Firehose - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon IoT Greengrass Version 1在 2023 年 6 月 30 日之前,将不再接收功能更新,并且将仅收到安全补丁和错误修复。有关更多信息,请参阅Amazon IoT Greengrass V1维护时段。我们强烈建议您迁移到Amazon IoT Greengrass Version 2,这增加了重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Firehose

Kinesis Firehose连接器通过 Amazon Kinesis Data Firehose 传输流将数据发布到 Amazon S3、Amazon Redshift 或Amazon 等目标 OpenSearch 服务 。

此连接器是 Kinesis 传输流的数据创建器。它接收 MQTT 主题上的输入数据,并将这些数据发送到指定的传输流。然后,该传输流将数据记录发送到已配置的目标(例如,S3 存储桶)。

此连接器具有以下版本。

版本

ARN

5

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/1

有关版本更改的信息,请参阅更改日志

要求

此连接器具有以下要求:

Version 4 - 5
  • Amazon IoT Greengrass Core 软件 v1.9.3 版或更高版本。

  • Python3.7 或 3.8 版安装在核心设备上并添加到 PATH 环境变量中。

    注意

    要使用 Python 3.8,请运行以下命令以创建从默认 Python 3.7 安装文件夹到已安装的 Python 3.8 二进制文件的符号链接。

    sudo ln -s path-to-python-3.8/python3.8 /usr/bin/python3.7

    这会将设备配置为满足 Amazon IoT Greengrass 的 Python 要求。

  • 一个已Kinesis 传输流。有关更多信息,请参阅 。创建 Amazon Kinesis Data Firehose 传输流中的Amazon Kinesis Firehose 开发.

  • 这些区域有:Greengrass 组角色配置为允许firehose:PutRecordfirehose:PutRecordBatch对目标传输流执行操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅 管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Versions 2 - 3
  • Amazon IoT Greengrass核心软件版本 v1.7 或更高版本。

  • Python安装在核心设备上的 2.7 版并添加到 PATH 环境变量中。

  • 一个已Kinesis 传输流。有关更多信息,请参阅 。创建 Amazon Kinesis Data Firehose 传输流中的Amazon Kinesis Firehose 开发.

  • 这些区域有:Greengrass 组角色配置为允许firehose:PutRecordfirehose:PutRecordBatch对目标传输流执行操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅 管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Version 1
  • Amazon IoT Greengrass核心软件版本 v1.7 或更高版本。

  • Python安装在核心设备上的 2.7 版并添加到 PATH 环境变量中。

  • 一个已Kinesis 传输流。有关更多信息,请参阅 。创建 Amazon Kinesis Data Firehose 传输流中的Amazon Kinesis Firehose 开发.

  • 这些区域有:Greengrass 组角色配置为允许firehose:PutRecord操作对目标传输流执行操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws:firehose:region:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅 管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

连接器连接器径

该连接器提供以下参数:

Versions 5
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求

显示名称Amazon IoT控制台:默认传输流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

显示名称Amazon IoT控制台:Maximum of records to bufer (per

必需:true

类型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

显示名称Amazon IoT控制台:内存大小

必需:true

类型:string

有效模式:^[0-9]+$

PublishInterval

将记录发布到 Kinesis Data Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

显示名称Amazon IoT控制台:发布时间间隔

必需:true

类型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

此连接器的容器化模式。默认值为 GreengrassContainer,这意味着连接器在 Amazon IoT Greengrass 容器内的隔离运行时环境中运行。

注意

组的默认容器化设置不适用于连接器。

显示名称Amazon IoT控制台:容器容器容器容

必需:false

类型:string

有效值:GreengrassContainerNoContainer

有效模式:^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求

显示名称Amazon IoT控制台:默认传输流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

显示名称Amazon IoT控制台:Maximum of records to bufer (per

必需:true

类型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

显示名称Amazon IoT控制台:内存大小

必需:true

类型:string

有效模式:^[0-9]+$

PublishInterval

将记录发布到 Kinesis Data Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

显示名称Amazon IoT控制台:发布时间间隔

必需:true

类型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求

显示名称Amazon IoT控制台:默认传输流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

创建连接器示例 (Amazon CLI)

以下 CLI 命令将创建一个ConnectorDefinition使用包含连接器的初始版本。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:region::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

在Amazon IoT Greengrassconsole,则可以从该组的连接器页. 有关更多信息,请参阅 Greengrass 连接器入门(控制台)

输入数据

此连接器接受 MQTT 主题上的流内容,然后将这些内容发送到目标传输流。它接受两种类型的输入数据:

  • JSON 数据,位于 kinesisfirehose/message 主题中。

  • 二进制数据,位于 kinesisfirehose/message/binary/# 主题中。

Versions 2 - 5
主题筛选条件:kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:object包含以下属性的:

data

要发送到传输流的数据。

必需:true

类型:string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件:kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

Version 1
主题筛选条件:kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:object包含以下属性的:

data

要发送到传输流的数据。

必需:true

类型:string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件:kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

输出数据

此连接器将状态信息发布为 MQTT 主题的输出数据。

Versions 2 - 5
订阅中的主题筛选条件

kinesisfirehose/message/status

输出示例

响应包含批处理中发送的每条数据记录的状态。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注意

如果连接器检测到可重试错误(例如,连接错误),则会在下一个批次中重试发布。指数回退由Amazon开发工具包。失败并带可重试的错误的请求将添加回队列的结尾以供进一步发布。

Version 1
订阅中的主题筛选条件

kinesisfirehose/message/status

输出示例:成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
输出示例:失败
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

使用示例

使用以下概括步骤设置可用于尝试连接器的示例 Python 3.7 Lambda 函数。

注意
  1. 确保满足连接器的要求

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅 管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

  2. 创建并发布将输入数据发送到连接器的 Lambda 函数。

    示例代码保存为 PY 文件。下载并解压缩Amazon IoT Greengrass适用于 Python 的核心包. 然后,创建一个 zip 包,其中在根级别包含 PY 文件和 greengrasssdk 文件夹。此 zip 包是您上传到的部署包Amazon Lambda.

    创建 Python 3.7 Lambda 函数后,发布函数版本并创建别名。

  3. 配置 Greengrass 组。

    1. 通过函数的别名添加 Lambda 函数(推荐)。将 Lambda 生命周期配置为长时间生存(或"Pinned": true在 CLI 中)。

    2. 添加连接器并配置其参数

    3. 添加允许连接器接收 JSON 输入数据并针对支持的主题筛选条件发送输出数据的订阅。

      • 将 Lambda 函数设置为源,将连接器设置为目标,并使用支持的输入主题筛选条件。

      • 将连接器设置为源,将 Amazon IoT Core 设置为目标,并使用支持的输出主题筛选条件。您可以使用此订阅查看中的状态消息Amazon IoT控制台。

  4. 部署组。

  5. 在Amazon IoT控制台,测试页面上,订阅输出数据主题以查看连接器中的状态消息。示例 Lambda 函数是长时间生存的,并且在部署组后立即开始发送消息。

    完成测试后,您可以将 Lambda 生命周期设置为按需(或"Pinned": falseCLI)并部署组。这会阻止函数发送消息。

示例

以下示例 Lambda 函数将输入消息发送到连接器。此消息包含 JSON 数据。

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

许可证

Kinesis Firehose 连接器包含以下第三方软件/许可:

此连接器在Greengrass 核心软件许可协议.

更改日志

下表介绍每一版连接器中的更改。

版本

更改

5

增加了用于配置连接器容器化模式的 IsolationMode 参数。

4

已将 Lambda 运行时升级到 Python 3.7,这会更改运行时要求。

3

修复以减少过多的日志记录,并修复了其他较小的错误。

2

添加了对按指定间隔向 Kinesis Data Firehose 发送批处理数据记录的支持。

  • 还需要在组角色中执行 firehose:PutRecordBatch 操作。

  • 新的 MemorySizeDeliveryStreamQueueSizePublishInterval 参数。

  • 输入消息包含一组已发布的数据记录的状态响应。

1

首次发布。

Greengrass 组每次只能包含连接器的一个版本。有关升级连接器版本的信息,请参阅升级连接器版本

另请参阅