Kinesis Firehose - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Firehose

Kinesis Firehose 连接器通过 Amazon Kinesis Data Firehose 传输流将数据发布到目标,如 Amazon S3、Amazon Redshift 或 Amazon Elasticsearch Service。

此连接器是 Kinesis 传输流的数据创建器。它接收 MQTT 主题上的输入数据,并将这些数据发送到指定的传输流。然后,该传输流将数据记录发送到已配置的目标(例如,S3 存储桶)。

此连接器具有以下版本。

版本

进行筛选

5

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/5

4

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/4

3

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/3

2

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/1

有关版本更改的信息,请参阅更改日志

要求

此连接器具有以下要求:

Version 4 - 5
  • AWS IoT Greengrass Core 软件 v1.9.3 版或更高版本。

  • Python 版本 3.7 已安装在核心设备上并且已添加到 PATH 环境变量。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅 Firehose 开发人员指南Amazon Kinesis Data Firehose 中的创建 传输流Amazon Kinesis。

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Versions 2 - 3
  • AWS IoT Greengrass Core 软件 v1.7 or later。

  • Python 版本 2.7 已安装在核心设备上并且已添加到 PATH 环境变量。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅 Firehose 开发人员指南Amazon Kinesis Data Firehose 中的创建 传输流Amazon Kinesis。

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

Version 1
  • AWS IoT Greengrass Core 软件 v1.7 or later。

  • Python 版本 2.7 已安装在核心设备上并且已添加到 PATH 环境变量。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅 Firehose 开发人员指南Amazon Kinesis Data Firehose 中的创建 传输流Amazon Kinesis。

  • Greengrass 组角色,配置为允许对目标传输流执行 firehose:PutRecord 操作,如以下示例 IAM 策略中所示。

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

连接器参数

该连接器提供以下参数:

Versions 5
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

在 AWS IoT 控制台中显示名称:默认传输流 ARN

必需: true

类型: string

有效模式: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

在 AWS IoT 控制台中显示名称:Maximum number of records to buffer (per stream) (要缓存的记录的最大数目 (每个流))

必需: true

类型: string

有效模式: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

在 AWS IoT 控制台中显示名称:Memory size (内存大小)

必需: true

类型: string

有效模式: ^[0-9]+$

PublishInterval

将记录发布到 Kinesis Data Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

在 AWS IoT 控制台中显示名称:Publish interval (发布间隔)

必需: true

类型: string

有效值: 0 - 900

有效模式: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

IsolationMode

此连接器的容器化模式。默认值为 GreengrassContainer,这意味着连接器在 AWS IoT Greengrass 容器内的隔离运行时环境中运行。

注意

组的默认容器化设置不适用于连接器。

在 AWS IoT 控制台中显示名称:容器隔离模式

必需: false

类型: string

有效值:GreengrassContainerNoContainer

有效模式: ^NoContainer$|^GreengrassContainer$

Versions 2 - 4
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

在 AWS IoT 控制台中显示名称:默认传输流 ARN

必需: true

类型: string

有效模式: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

在 AWS IoT 控制台中显示名称:Maximum number of records to buffer (per stream) (要缓存的记录的最大数目 (每个流))

必需: true

类型: string

有效模式: ^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

在 AWS IoT 控制台中显示名称:Memory size (内存大小)

必需: true

类型: string

有效模式: ^[0-9]+$

PublishInterval

将记录发布到 Kinesis Data Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

在 AWS IoT 控制台中显示名称:Publish interval (发布间隔)

必需: true

类型: string

有效值: 0 - 900

有效模式: [0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅要求

在 AWS IoT 控制台中显示名称:默认传输流 ARN

必需: true

类型: string

有效模式: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

创建连接器示例 (AWS CLI)

以下 CLI 命令创建一个 ConnectorDefinition,其初始版本包含该连接器。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/5", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10", "IsolationMode" : "GreengrassContainer" } } ] }'

在 AWS IoT Greengrass 控制台 中,您可以从该组的 Connectors (连接器)页面添加一个连接器。有关更多信息,请参阅 Greengrass 连接器入门(控制台)

输入数据

此连接器接受 MQTT 主题上的流内容,然后将这些内容发送到目标传输流。它接受两种类型的输入数据:

  • JSON 数据,位于 kinesisfirehose/message 主题中。

  • 二进制数据,位于 kinesisfirehose/message/binary/# 主题中。

Versions 2 - 5
主题筛选条件:kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需: true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需: true

类型: string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需: false

类型: string

有效模式: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需: false

类型: string

有效模式: .*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws-cn:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件:kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不想将请求映射到响应,则可以将消息发布到 kinesisfirehose/message/binary/。 请务必包含尾部斜杠。

Version 1
主题筛选条件:kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需: true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需: true

类型: string

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需: false

类型: string

有效模式: arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需: false

类型: string

有效模式: .*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws-cn:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件:kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不想将请求映射到响应,则可以将消息发布到 kinesisfirehose/message/binary/。 请务必包含尾部斜杠。

输出数据

此连接器将状态信息发布为 MQTT 主题的输出数据。

Versions 2 - 5
订阅中的主题筛选条件

kinesisfirehose/message/status

输出示例

响应包含批处理中发送的每个数据记录的状态。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }
注意

如果连接器检测到可重试的错误(例如,连接错误),它将在下一个批处理中重试发布操作。 指数回退由 AWS 开发工具包进行处理。失败并带可重试的错误的请求将添加回队列的结尾以供进一步发布。

Version 1
订阅中的主题筛选条件

kinesisfirehose/message/status

示例输出: 成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
示例输出: 失败
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

示例用法

使用以下概括步骤设置可用于尝试连接器的示例 Python 3.7 Lambda 函数。

注意
  1. 确保满足连接器的要求

    对于组角色要求,您必须将角色配置为授予所需权限,并确保角色已添加到组中。有关更多信息,请参阅管理 Greengrass 组角色(控制台)管理 Greengrass 组角色 (CLI)

  2. 创建并发布将输入数据发送到连接器的 Lambda 函数。

    示例代码保存为 PY 文件。下载并解压缩 适用于 Python 的 AWS IoT Greengrass Core 开发工具包。然后,创建一个 zip 包,其中在根级别包含 PY 文件和 greengrasssdk 文件夹。此 zip 包是您上传到 AWS Lambda 的部署包。

    创建 Python 3.7 Lambda 函数后,发布函数版本并创建别名。

  3. 配置 Greengrass 组。

    1. 通过 Lambda 函数的别名添加此函数(推荐)。将 Lambda 生命周期配置为长时间生存(或在 CLI 中设置为 "Pinned": true)。

    2. 添加连接器并配置其参数

    3. 添加允许连接器接收 JSON 输入数据并针对支持的主题筛选条件发送输出数据的订阅。

      • 将 Lambda 函数设置为源,将连接器设置为目标,并使用支持的输入主题筛选条件。

      • 将连接器设置为源,将 AWS IoT Core 设置为目标,并使用支持的输出主题筛选条件。您可以使用此订阅查看 AWS IoT 控制台 中的状态消息。

  4. 部署组。

  5. 在 AWS IoT 控制台 中,在 Test (测试) 页面上,订阅输出数据主题以查看连接器中的状态消息。示例 Lambda 函数是长时间生存的,并且在部署组后立即开始发送消息。

    完成测试后,您可以将 Lambda 生命周期设置为按需(或在 CLI 中设置为 "Pinned": false )并部署组。这会阻止函数发送消息。

Example

以下是 Lambda 功能将输入消息发送到连接器的示例。 此消息包含 JSON 数据。

import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print("Message To Publish: ", messageToPublish) iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def lambda_handler(event, context): return

许可证

Kinesis Firehose 连接器包含以下第三方软件/许可:

此连接器按照 Greengrass 核心软件许可协议发布。

Changelog

下表描述了连接器每个版本中所做的更改。

版本

更改

5

增加了用于配置连接器容器化模式的 IsolationMode 参数。

4

已将 Lambda 运行时升级到 Python 3.7,这会更改运行时要求。

3

修复以减少过多的日志记录,并修复了其他较小的错误。

2

添加了对按指定间隔向 Kinesis Data Firehose 发送批处理数据记录的支持。

  • 还需要在组角色中执行 firehose:PutRecordBatch 操作。

  • 新的 MemorySizeDeliveryStreamQueueSizePublishInterval 参数。

  • 输入消息包含一组已发布的数据记录的状态响应。

1

首次发布。

Greengrass 组一次只能包含一个版本的连接器。有关升级连接器版本的信息,请参阅升级连接器版本

另请参阅