Amazon IoT Greengrass Version 1在 2023 年 6 月 30 日之前,将不再接收功能更新,并且将仅收到安全补丁和错误修复。有关更多信息,请参阅Amazon IoT Greengrass V1维护时段。我们强烈建议您迁移到Amazon IoT Greengrass Version 2,这增加了重要的新功能并支持其他平台。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将数据流导出到Amazon Web Services 云(CLI)
本教程向您展示了如何使用Amazon CLI配置和部署Amazon IoT Greengrass已启用流管理器的组。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到Amazon Web Services 云.
流管理器使得摄取、处理和导出大容量数据流更高效和更可靠。在本教程中,您将创建一个TransferStream
使用IoT 数据的 Lambda 函数。Lambda 函数使用Amazon IoT GreengrassCore SDK,用于在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。

本教程的重点是展示用户定义的 Lambda 函数如何使用StreamManagerClient
对象中的Amazon IoT Greengrass核心 SDK for 流管理器交互。为简单起见,您为本教程创建的 Python Lambda 函数将生成模拟的设备数据。
当您使用Amazon IoT GreengrassAPI,其中包含 Greengrass 命令Amazon CLI,要创建组,默认情况下会禁用流管理器。要在你的核心上启用直播管理器,你创建函数定义版本包括系统GGStreamManager
Lambda 函数以及引用新函数定义版本的组版本。然后部署组。
先决条件
要完成此教程,需要:
-
Greengrass 组和 Greengrass Core(v1.10 或更高版本)。有关如何创建 Greengrass 组和核心的信息,请参阅Amazon IoT Greengrass 入门. “入门”教程还包含用于安装 Amazon IoT Greengrass Core 软件的步骤。
注意
不支持流管理器 OpenWrt 分布。
-
核心设备上安装的 Java 8 运行时 (JDK 8)。
-
对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:
sudo apt install openjdk-8-jdk
-
对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:
sudo yum install java-1.8.0-openjdk
有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包
。
-
-
Amazon IoT Greengrass适用于 Python 的核心开发工具包 v1.5.0 或更高版本。使用
StreamManagerClient
中的Amazon IoT Greengrass适用于 Python 的核心开发工具包,你必须:-
在核心设备上安装 Python 3.7 或更高版本。
-
在您的 Lambda 函数部署包中包含开发工具包及其依赖项。本教程中提供了说明。
提示
可以将
StreamManagerClient
与 Java 或 NodeJS 结合使用。有关示例代码,请参阅Amazon IoT Greengrass适用于 Java 的核心开发工具和Amazon IoT Greengrass适用于 Node.js 的核心开发工具包 上 GitHub. -
-
名为的目标流
MyKinesisStream
在同一 Amazon Kinesis Data Streams 中创建Amazon Web Services 区域作为您的 Greengrass 组。有关更多信息,请参阅 。创建流中的Amazon Kinesis 开发人员指南.注意
在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您收取费用Amazon Web Services 账户. 有关定价的信息,请参阅Kinesis Data Streams
. 为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。
-
IAM 策略已添加到Greengrass 组角色这允许
kinesis:PutRecords
对目标数据流的操作,如以下示例所示:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:
region
:account-id
:stream/MyKinesisStream" ] } ] }
-
您的计算机上安装和配置的 Amazon CLI。有关更多信息,请参阅 。安装Amazon Command Line Interface和配置Amazon CLI中的Amazon Command Line Interface用户指南.
本教程中的示例命令是针对 Linux 及其他基于 Unix 的系统编写的。如果您使用的是 Windows,请参阅为 指定参数值Amazon命令行界面了解有关语法差异的更多信息。
如果命令包含 JSON 字符串,本教程提供了在单行包含 JSON 的示例。在某些系统上,使用此格式可能会更有效地编辑和运行命令。
本教程包含以下概括步骤:
完成本教程大约需要 30 分钟。
第 1 步:创建 Lambda 函数部署软件包
在此步骤中,您将创建一个包含 Python 函数代码和依赖项的 Lambda 函数部署软件包。您稍后在创建中的 Lambda 函数Amazon Lambda. Lambda 函数使用Amazon IoT GreengrassCore SDK,用于创建本地流并与之交互。
注意
您用户定义的 Lambda 函数必须使用Amazon IoT Greengrass核心开发工具包与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求。
-
下载Amazon IoT Greengrass适用于 Python 的核心v1.5.0 或更高版本。
-
解压缩下载的程序包以获取软件开发工具包。软件开发工具包是
greengrasssdk
文件夹。 -
安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。
-
导航到包含该
requirements.txt
文件的开发工具包目录。此文件列出了依赖项。 -
安装开发工具包依赖项。例如,运行以下
pip
命令将它们安装在当前目录中:pip install --target . -r requirements.txt
-
-
将以下 Python 代码函数保存在名为
transfer_stream.py
的本地文件中。提示
有关使用 Java 和 NodeJS 的示例代码,请参阅Amazon IoT Greengrass适用于 Java 的核心开发工具
和Amazon IoT Greengrass适用于 Node.js 的核心开发工具包 上 GitHub. import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
-
将以下项目压缩到名为
transfer_stream_python.zip
的文件中。此即您的 Lambda 函数部署软件包。-
transfer_stream.py。应用程序逻辑。
-
greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。
流管理器操作在 1.5.0 或更高版本的 1.5.0 版本或更高版本中提供Amazon IoT Greengrass适用于 Python 的核心开发工具包
-
您为... 安装的依赖项Amazon IoT Greengrass适用于 Python 的核心开发工具包(例如
cbor2
目录)。
创建
zip
文件时,仅包含这些项目,而不是包含文件夹。 -
第 2 步:创建 Lambda 函数
-
创建 IAM 角色,以便您可以在创建该函数时传入角色 ARN。
注意
Amazon IoT Greengrass不使用此角色,因为对您的 Greengrass Lambda 函数的权限在 Greengrass 组角色中指定。对于本教程,您将创建一个空角色。
-
从输出中复制
Arn
。 -
使用 Amazon Lambda API 创建
TransferStream
函数。以下命令假定该 zip 文件位于当前目录中。-
Replace
角色 ARN
用Arn
您复制的内容。
aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role
role-arn
\ --handler transfer_stream.function_handler \ --runtime python3.7 -
-
发布该函数的版本。
aws lambda publish-version --function-name TransferStream --description 'First version'
-
为发布的版本创建别名。
Greengrass 组可以按别名(推荐)或版本引用 Lambda 函数。使用别名,您可以更轻松地管理代码更新,因为您无需在函数代码更新时更改订阅表或组定义。相反,您只需将别名指向新的函数版本。
aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
注意
Amazon IoT Greengrass不支持 Lambda 别名$LATEST版本。
-
从输出中复制
AliasArn
。为 Amazon IoT Greengrass 配置函数时,可以使用此值。
现在,您已准备就绪,可以为 Amazon IoT Greengrass 配置函数。
第 3 步:创建函数定义和版本
此步骤创建一个引用系统的函数定义版本GGStreamManager
Lambda 函数和你用户定义的TransferStream
Lambda 函数。要在使用时启用直播管理器Amazon IoT GreengrassAPI,您的函数定义版本必须包含GGStreamManager
function.
-
使用包含系统和用户定义的 Lambda 函数的初始版本创建函数定义。
以下定义版本使用默认值启用流管理器parameter. 要配置自定义设置,必须为相应的流管理器参数定义环境变量。有关示例,请参阅 。启用、禁用或配置流管理器 (CLI).Amazon IoT Greengrass将为省略的参数使用默认设置。
MemorySize
应至少为128000
.Pinned
必须设置为true
.注意
一个长时间生存的(或固定) Lambda 函数在以下时间之后自动启动Amazon IoT Greengrass在它自己的容器中启动并保持运行。这与按需Lambda 函数,该函数在调用时启动,并在没有要运行的任务时停止。有关更多信息,请参阅 Greengrass Lambda 函数的生命周期配置。
-
Replace
arbitrary-function-id
使用函数的名称,如stream-manager
. -
Replace
alias-arn
用AliasArn
您在为创建别名时复制的TransferStream
Lambda 函数。
注意
Timeout
是函数定义版本所需的,但GGStreamManager
不使用它。有关 的更多信息Timeout
和其他组级别设置,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行. -
-
从输出中复制
LatestVersionArn
。您将使用此值向部署到核心的组版本添加函数定义版本。
第 4 步:创建记录器定义和版本
配置组的日志记录设置。在本教程中,您将配置Amazon IoT Greengrass系统组件、用户定义的 Lambda 函数以及用于将日志写入核心设备的文件系统的连接器。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅 利用 Amazon IoT Greengrass 日志进行监控。
第 5 步:获取核心定义版本的 ARN
获取要添加到新组版本的核心定义版本的 ARN。要部署组版本,该组版本必须引用包含确切一个核心的核心定义版本。
-
获取目标 Greengrass 组和组版本的 ID。此过程假定这是最新的组和组版本。以下查询返回最近创建的组。
aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
或者,您也可以按名称查询。系统不要求组名称是唯一的,所以可能会返回多个组。
aws greengrass list-groups --query "Groups[?Name=='
MyGroup
']"注意
您还可以在中找到这些值Amazon IoT控制台。组 ID 显示在组的设置页面上。组版本 ID 显示在组的部署选项卡。
-
从输出中复制目标组的
Id
。您将使用此值获取核心定义版本及部署组的时间。 -
从输出中复制
LatestVersion
,这是添加到组的最后一个版本的 ID。您将使用此值获取核心定义版本。 -
获取核心定义版本的 ARN:
-
获取组版本。
-
使用为组复制的
替换
group-idId
。 -
Replace
group-version-id
用LatestVersion
您为组复制的内容。
aws greengrass get-group-version \ --group-id
group-id
\ --group-version-idgroup-version-id
-
-
从输出中复制
CoreDefinitionVersionArn
。您将使用此值向部署到核心的组版本添加核心定义版本。
-
第 6 步:创建组版本
现在,您已准备就绪,可以创建一个包含您要部署的实体的组版本。要实现此目的,需要创建一个引用每种组件类型的目标版本的组版本。在本教程中,您将包括核心定义版本、函数定义版本和记录器定义版本。
-
创建组版本。
-
使用为组复制的
替换
group-idId
。 -
Replace
core-definition-version-arn
用CoreDefinitionVersionArn
您为核心定义版本复制的。 -
Replace
function-definition-version-arn
用LatestVersionArn
您为新的函数定义版本复制的。 -
Replace
logger-definition-version-arn
用LatestVersionArn
您为新的记录器定义版本复制的。
aws greengrass create-group-version \ --group-id
group-id
\ --core-definition-version-arncore-definition-version-arn
\ --function-definition-version-arnfunction-definition-version-arn
\ --logger-definition-version-arnlogger-definition-version-arn
-
-
从输出中复制
Version
。这是新组版本的 ID。
步骤 7:创建部署
将组部署到核心设备。
-
确保Amazon IoT Greengrass核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。
要检查守护程序是否正在运行,请执行以下操作:
ps aux | grep -E 'greengrass.*daemon'
如果输出包含
root
的/greengrass/ggc/packages/
条目,则表示守护程序正在运行。ggc-version
/bin/daemon注意
路径中的版本取决于您的核心设备上安装的 Amazon IoT Greengrass 核心软件版本。
启动守护程序:
cd /greengrass/ggc/core/ sudo ./greengrassd start
-
创建部署。
使用为组复制的
替换
group-idId
。Replace
group-version-id
用Version
您为新组版本复制的文件。
aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id
group-id
\ --group-version-idgroup-version-id
-
从输出中复制
DeploymentId
。 -
获取部署状态。
使用为组复制的
替换
group-idId
。Replace
部署 ID
用DeploymentId
你为部署复制的。
aws greengrass get-deployment-status \ --group-id
group-id
\ --deployment-iddeployment-id
如果状态为
Success
,则部署成功。有关问题排查帮助,请参阅排除 Amazon IoT Greengrass 的故障。
步骤 8:测试应用程序
这些区域有:TransferStream
Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。
-
在Amazon Kinesis 控制台中,在Kinesis 数据流,选择MyKinesisStream.
注意
如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (
GGStreamManager
)。如果它在错误消息中包含export stream MyKinesisStream doesn't exist
,则测试成功。此错误意味着服务试图导出到流,但流不存在。 -
在存储库的MyKinesisStream页面上,选择监控. 如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。
重要
测试完成后,删除 Kinesis 数据流以避免产生更多费用。
运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。
cd /greengrass/ggc/core/ sudo ./greengrassd stop
-
删除TransferStreamLambda 函数来自核心。
按照第 6 步:创建组版本创建新的组版本,但删除
create-group-version
命令中的--function-definition-version-arn
选项。或者,创建不包括TransferStreamLambda 函数。注意
通过省略系统
GGStreamManager
Lambda 函数从部署的组版本开始,您将禁用核心上的流管理。-
按照步骤 7:创建部署以部署新的组版本。
要查看日志记录信息或解决流的问题,请检查日志中的 TransferStream
和 GGStreamManager
函数。您必须具有 root
权限以读取文件系统上的 Amazon IoT Greengrass 日志。
TransferStream
将日志条目写入
。greengrass-root
/ggc/var/log/user/region
/account-id
/TransferStream.logGGStreamManager
将日志条目写入
。greengrass-root
/ggc/var/log/system/GGStreamManager.log
如果需要更多疑难解答信息,可以将 Lambda
日志记录级别设置为 DEBUG
,然后创建并部署新的组版本。