AWS IoT Greengrass
开发人员指南
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

Kinesis Firehose

Kinesis Firehose 连接器通过 Amazon Kinesis Data Firehose 传输流将数据发布到目标,如 Amazon S3、Amazon Redshift 或 Amazon Elasticsearch Service。

此连接器是 Kinesis 传输流的数据创建器。它接收 MQTT 主题上的输入数据,并将这些数据发送到指定的传输流。然后,该传输流将数据记录发送到已配置的目标(例如,S3 存储桶)。

此连接器具有以下版本。

版本

ARN

2

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/2

1

arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/1

有关版本更改的信息,请参阅更改日志

要求

此连接器具有以下要求:

Version 2Version 1
Version 2
  • AWS IoT Greengrass 核心软件 v1.7 or later。

  • Python 版本 2.7 已安装在核心设备上并且已添加到 PATH 环境变量。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅Amazon Kinesis Firehose 开发人员指南中的创建 Amazon Kinesis Data Firehose 传输流

  • 一个添加到 Greengrass 组角色的 IAM 策略,该策略允许对目标传输流执行 firehose:PutRecordfirehose:PutRecordBatch 操作,如以下示例所示:

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Effect":"Allow", "Resource":[ "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。 有关更多信息,请参阅IAM 用户指南中的添加和删除 IAM 策略

Version 1
  • AWS IoT Greengrass 核心软件 v1.7 or later。

  • Python 版本 2.7 已安装在核心设备上并且已添加到 PATH 环境变量。

  • 一个已配置的 Kinesis 传输流。有关更多信息,请参阅Amazon Kinesis Firehose 开发人员指南中的创建 Amazon Kinesis Data Firehose 传输流

  • 一个添加到了 Greengrass 组角色的 IAM 策略,该策略允许对目标传输流执行 firehose:PutRecord 操作,如以下示例所示:

    { "Version":"2012-10-17", "Statement":[ { "Sid":"Stmt1528133056761", "Action":[ "firehose:PutRecord" ], "Effect":"Allow", "Resource":[ "arn:aws-cn:firehose:区域:account-id:deliverystream/stream-name" ] } ] }

    利用此连接器,您可以动态覆盖输入消息负载中的默认传输流。如果您的实施使用此功能,则 IAM 策略应包括所有目标流作为资源。您可以授予对资源的具体或条件访问权限(例如,通过使用通配符*命名方案)。 有关更多信息,请参阅IAM 用户指南中的添加和删除 IAM 策略

连接器参数

该连接器提供以下参数:

Version 2Version 1
Version 2
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求

在控制台中显示名称:默认传输流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

DeliveryStreamQueueSize

在同一传输流的新记录被拒绝之前要保留在内存中的记录的最大数目。最小值为 2000。

在控制台中显示名称:Maximum number of records to buffer (per stream) (要缓存的记录的最大数目 (每个流))

必需:true

类型:string

有效模式:^([2-9]\\d{3}|[1-9]\\d{4,})$

MemorySize

要分配给此连接器的内存量(以 KB 为单位)。

在控制台中显示名称:Memory size (内存大小)

必需:true

类型:string

有效模式:^[0-9]+$

PublishInterval

将记录发布到 Kinesis Data Firehose 的时间间隔(以秒为单位)。要禁用批处理,请将此值设置为 0。

在控制台中显示名称:Publish interval (发布间隔)

必需:true

类型:string

有效值:0 - 900

有效模式:[0-9]|[1-9]\\d|[1-9]\\d\\d|900

Version 1
DefaultDeliveryStreamArn

要将数据发送到的默认 Kinesis Data Firehose 传输流的 ARN。目标流可由输入消息负载中的 delivery_stream_arn 属性覆盖。

注意

组角色必须允许对所有目标传输流执行适当的操作。有关更多信息,请参阅 要求

在控制台中显示名称:默认传输流 ARN

必需:true

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

创建连接器示例 (CLI)

以下 CLI 命令创建一个 ConnectorDefinition,其初始版本包含该连接器。

aws greengrass create-connector-definition --name MyGreengrassConnectors --initial-version '{ "Connectors": [ { "Id": "MyKinesisFirehoseConnector", "ConnectorArn": "arn:aws:greengrass:区域::/connectors/KinesisFirehose/versions/2", "Parameters": { "DefaultDeliveryStreamArn": "arn:aws-cn:firehose:region:account-id:deliverystream/stream-name", "DeliveryStreamQueueSize": "5000", "MemorySize": "65535", "PublishInterval": "10" } } ] }'

在 AWS IoT Greengrass 控制台 中,您可以从该组的 Connectors (连接器)页面添加一个连接器。有关更多信息,请参阅 Greengrass 连接器入门(控制台)

输入数据

此连接器接受 MQTT 主题上的流内容,然后将这些内容发送到目标传输流。它接受两种类型的输入数据:

  • JSON 数据,位于 kinesisfirehose/message 主题中。

  • 二进制数据,位于 kinesisfirehose/message/binary/# 主题中。

Version 2Version 1
Version 2
主题筛选条件kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需:true

类型:bytes

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws-cn:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

Version 1
主题筛选条件kinesisfirehose/message

使用此主题发送包含 JSON 数据的消息。

消息属性
request

要发送到传输流和目标传输流(如果不同于默认流)的数据。

必需:true

类型:包含以下属性的 object

data

要发送到传输流的数据。

必需:true

类型:bytes

delivery_stream_arn

目标 Kinesis 传输流的 ARN。包含此属性是为了覆盖默认传输流。

必需:false

类型:string

有效模式:arn:aws:firehose:([a-z]{2}-[a-z]+-\d{1}):(\d{12}):deliverystream/([a-zA-Z0-9_\-.]+)$

id

请求的任意 ID。此属性用于将输入请求映射到输出响应。如果指定,响应对象中的 id 属性将设置为该值。如果您不使用此功能,可以忽略此属性或指定空字符串。

必需:false

类型:string

有效模式:.*

示例输入
{ "request": { "delivery_stream_arn": "arn:aws-cn:firehose:region:account-id:deliverystream/stream2-name", "data": "Data to send to the delivery stream." }, "id": "request123" }

 

主题筛选条件kinesisfirehose/message/binary/#

使用此主题发送包含二进制数据的消息。连接器不会解析二进制数据。该数据将按原样进行流式传输。

要将输入请求映射到输出响应,请将消息主题中的 # 通配符替换为任意请求 ID。例如,如果您将消息发布到 kinesisfirehose/message/binary/request123,响应对象中的 id 属性将设置为 request123

如果您不希望将请求映射到响应,可以将消息发布到 kinesisfirehose/message/binary/。请务必包含尾斜杠。

输出数据

此连接器将状态信息发布为输出数据。

Version 2Version 1
Version 2
主题筛选条件

kinesisfirehose/message/status

输出示例

该响应包含在批处理中发送的每个数据记录的状态。

{ "response": [ { "ErrorCode": "error", "ErrorMessage": "test error", "id": "request123", "status": "fail" }, { "firehose_record_id": "xyz2", "id": "request456", "status": "success" }, { "firehose_record_id": "xyz3", "id": "request890", "status": "success" } ] }

注意

如果连接器检测到可重试的错误(例如,连接错误),它将在下一个批处理中重试发布操作。 指数回退由 AWS 开发工具包进行处理。失败并带可重试的错误的请求将添加回队列的结尾以供进一步发布。

Version 1
主题筛选条件

kinesisfirehose/message/status

示例输出:成功
{ "response": { "firehose_record_id": "1lxfuuuFomkpJYzt/34ZU/r8JYPf8Wyf7AXqlXm", "status": "success" }, "id": "request123" }
示例输出:失败
{ "response" : { "error": "ResourceNotFoundException", "error_message": "An error occurred (ResourceNotFoundException) when calling the PutRecord operation: Firehose test1 not found under account 123456789012.", "status": "fail" }, "id": "request123" }

示例用法

以下是 Lambda 功能将输入消息发送到连接器的示例。 此消息包含 JSON 数据。

注意

该 Python 功能使用 AWS IoT Greengrass Core 开发工具包 来发布 MQTT 消息。您可以使用以下 pip 命令在核心设备上安装 SDK 的 Python 版本:

pip install greengrasssdk
import greengrasssdk import time import json iot_client = greengrasssdk.client('iot-data') send_topic = 'kinesisfirehose/message' def create_request_with_all_fields(): return { "request": { "data": "Message from Firehose Connector Test" }, "id" : "req_123" } def publish_basic_message(): messageToPublish = create_request_with_all_fields() print "Message To Publish: ", messageToPublish iot_client.publish(topic=send_topic, payload=json.dumps(messageToPublish)) publish_basic_message() def function_handler(event, context): return

许可证

Kinesis Firehose 连接器包含以下第三方软件/许可:

此连接器按照 Greengrass 核心软件许可协议发布。

Changelog

下表描述了连接器每个版本中所做的更改。

版本

更改

2

添加了对按指定间隔向 Kinesis Data Firehose 发送批处理数据记录的支持。

  • 还需要在组角色中执行 firehose:PutRecordBatch 操作。

  • 新的 MemorySizeDeliveryStreamQueueSizePublishInterval 参数。

  • 输入消息包含一组已发布的数据记录的状态响应。

1

首次发布。

Greengrass 组一次只能包含一个版本的连接器。

另请参阅