将数据流导出到Amazon Web Services 云(console) - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon IoT Greengrass Version 1在 2023 年 6 月 30 日之前,将不再接收功能更新,并且将仅收到安全补丁和错误修复。有关更多信息,请参阅Amazon IoT Greengrass V1维护时段。我们强烈建议您迁移到Amazon IoT Greengrass Version 2,这增加了重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到Amazon Web Services 云(console)

本教程向您展示了如何使用Amazon IoT控制台来配置和部署Amazon IoT Greengrass已启用流管理器的组。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到Amazon Web Services 云.

流管理器使得摄取、处理和导出大容量数据流更高效和更可靠。在本教程中,您将创建一个TransferStreamLambda 函数,它使用IoT 数据。Lambda 函数使用Amazon IoT GreengrassCore SDK,在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用StreamManagerClient对象在Amazon IoT GreengrassCore SDK,与流管理器交互。为简单起见,您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

先决条件

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心(v1.10 或更高版本)。有关如何创建 Greengrass 组和核心的信息,请参阅Amazon IoT Greengrass 入门. “入门”教程还包含用于安装 Amazon IoT Greengrass Core 软件的步骤。

    注意

    不支持流管理器 OpenWrt 分配。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包

  • Amazon IoT Greengrass适用于 Python v1.5.0 或更高版本的核心开发工具包。使用StreamManagerClient中的Amazon IoT Greengrass适用于 Python 的核心开发工具包,你必须:

    • 在核心设备上安装 Python 3.7 或更高版本。

    • 在您的 Lambda 函数部署包中包含开发工具包及其依赖项。本教程中提供了说明。

    提示

    可以将 StreamManagerClient 与 Java 或 NodeJS 结合使用。有关示例代码,请参阅Amazon IoT Greengrass适用于 SDK for JavaAmazon IoT Greengrass适用于 Node.js 的核心开发工具包上 GitHub.

  • 名为的目标流MyKinesisStream在相同的 Amazon Kinesis Data Streams 中创建Amazon Web Services 区域作为您的 Greengrass 组。有关更多信息,请参阅 。创建流中的Amazon Kinesis 开发人员指南.

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您收取费用Amazon Web Services 账户. 有关定价的信息,请参阅Kinesis Data Streams.

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • IAM 策略已添加到Greengrass 组角色这允许kinesis:PutRecords操作在目标数据流上,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

本教程包含以下概括步骤:

完成本教程大约需要 20 分钟。

第 1 步:创建 Lambda 函数部署软件包

在此步骤中,您将创建一个包含 Python 函数代码和依赖项的 Lambda 函数部署程序包。您稍后在创建Lambda 函数Amazon Lambda. Lambda 函数使用Amazon IoT GreengrassCore SDK,用于创建本地流并与之交互。

注意

您用户定义的 Lambda 函数必须使用Amazon IoT Greengrass核心开发工具包与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载Amazon IoT Greengrass适用于 Python 的酷睿v1.5.0 或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    提示

    有关使用 Java 和 NodeJS 的示例代码,请参阅Amazon IoT Greengrass适用于 SDK for JavaAmazon IoT Greengrass适用于 Node.js 的核心开发工具包上 GitHub.

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为 transfer_stream_python.zip 的文件中。此即您的 Lambda 函数部署软件包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作在 1.5.0 版本或更高版本的Amazon IoT Greengrass适用于 Python 的核心开发工具包

    • 您为Amazon IoT Greengrass适用于 Python 的核心开发工具包(例如cbor2目录)。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

第 2 步:创建 Lambda 函数

在此步骤中,您将使用Amazon Lambda控制台以创建 Lambda 函数并将其配置为使用您的部署程序包。接着,发布函数版本并创建别名。

  1. 首先,创建 Lambda 函数。

    1. 在 Amazon Web Services Management Console中,选择 Services (服务),然后打开 Amazon Lambda 控制台。

    2. 选择创建函数然后选择从头开始创作.

    3. Basic information (基本信息) 部分中,使用以下值:

      • 对于 Function name(函数名称),请输入 TransferStream

      • 对于 Runtime (运行时),选择 Python 3.7

      • 适用于Permissions (权限),请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。此角色未由使用Amazon IoT Greengrass.

    4. 在页面底部,选择创建函数

  2. 接下来,注册处理程序并上传您的 Lambda 函数部署程序包。

    1. 在存储库的代码选项卡,位于源代码,选择从以下项上传. 从下拉列表中,选择.zip 格式文件.

      
                带有 .zip 文件的上传自下拉列表突出显示。
    2. 选择上传,然后选择您的transfer_stream_python.zip部署程序包。然后,选择 Save (保存)

    3. 在存储库的代码函数的选项卡,位于Runte 设置,选择编辑,然后输入以下值。

      • 对于 Runtime (运行时),选择 Python 3.7

      • 对于 Handler (处理程序),输入 transfer_stream.function_handler

    4. 选择Save(保存)。

      注意

      这些区域有:测试按钮上的Amazon Lambda控制台不使用此函数。这些区域有:Amazon IoT Greengrass核心开发工具包不包含在Amazon Lambda控制台。这些模块(例如greengrass_common) 将在函数部署到你的 Greengrass 核心后提供给这些函数。

  3. 现在,发布您的 Lambda 函数的第一个版本并创建版本的别名.

    注意

    Greengrass 组可以按别名(推荐)或版本引用 Lambda 函数。使用别名,您可以更轻松地管理代码更新,因为您无需在函数代码更新时更改订阅表或组定义。相反,您只需将别名指向新函数版本。

    1. Actions 菜单上,选择 Publish new version

    2. 对于 Version description (版本描述),输入 First version,然后选择 Publish (发布)

    3. 在存储库的TransferStream:1配置页面,来自操作菜单中,选择创建别名.

    4. 创建新别名页面上,使用以下值:

      • 对于 Name (名称),请输入 GG_TransferStream

      • 对于 Version (版本),选择 1

      注意

      Amazon IoT Greengrass不支持 Lambda 别名$LATEST版本。

    5. 选择Create(创建)。

现在,您已准备就绪,可以将 Lambda 函数添加到 Greengrass 组。

第 3 步:将Lambda 函数添加到 Greengrass 组

在此步骤中,您将将 Lambda 函数添加到该组,然后配置其生命周期和环境变量。有关更多信息,请参阅 使用组特定的配置控制 Greengrass Lambda 函数的执行

  1. 在Amazon IoT控制台导航窗格,在Manage,展开Greengrass 设备,然后选择组 (V1).

  2. 选择目标组。

  3. 在组配置页面上,选择Lambda 函数选项卡。

  4. UNDER我的 Lambda 函数,选择Add.

  5. 在存储库的添加 Lambda 函数页面上,选择Lambda 函数您的 Lambda 函数。

  6. 对于Lambda 版本,选择别名:gg_TransferStream.

    现在,配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

  7. Lambda 函数配置部分,进行以下更改:

    • Memory limit (内存限制) 设置为 32 MB。

    • 适用于Pinned,选择True.

    注意

    一个长时间生存的(或固定) Lambda 函数在以下时间之后自动启动Amazon IoT Greengrass在它自己的容器中启动并继续运行。这与按需Lambda 函数,该函数在调用时启动,并在没有要运行的任务时停止。有关更多信息,请参阅 Greengrass Lambda 函数的生命周期配置

  8. 选择添加 Lambda 函数.

第 4 步:启用流管理器

在此步骤中,您确保启用流管理器。

  1. 在组配置页面上,选择Lambda 函数选项卡。

  2. UNDER系统 Lambda 函数,选择流管理器,然后检查状态。如果禁用,请选择 Edit (编辑)。然后,选择 Enable (启用)Save (保存)。您可以对本教程使用默认参数设置。有关更多信息,请参阅 配置 Amazon IoT Greengrass 流管理器

注意

使用控制台启用流管理器并部署组时,流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

第 5 步:配置本地日志记录

在此步骤中,您将配置Amazon IoT Greengrass系统组件、用户定义的 Lambda 函数以及组中的连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅 利用 Amazon IoT Greengrass 日志进行监控

  1. Local logs configuration (本地日志配置) 下,检查是否配置了本地日志记录。

  2. 如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志,请选择编辑.

  3. 选择用户 Lambda 函数日志级别Greengrass Systm.

  4. 保留日志记录级别和磁盘空间限制的默认值,然后选择 Save (保存)

第 6 步:部署 Greengrass 组

将组部署到核心设备。

  1. 确保Amazon IoT Greengrass核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 Amazon IoT Greengrass 核心软件版本。

    2. 启动守护程序:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 在组配置页面上,选择部署.

    1. Lambda 函数选项卡,位于系统 Lambda 函数部分,选择IP 探测器然后选择编辑.

    2. 编辑 IP 探测器设置对话框中,选择自动检测和覆盖 MQTT 代理端点.

    3. 选择Save(保存)。

      这使得设备可以自动获取核心的连接信息,例如 IP 地址、DNS 和端口号。建议使用自动检测,不过 Amazon IoT Greengrass 也支持手动指定的终端节点。只有在首次部署组时,系统才会提示您选择发现方法。

      注意

      如果出现提示,请授予创建Greengrass 服务角色并将其与你的Amazon Web Services 账户在最新的Amazon Web Services 区域. 此角色允许Amazon IoT Greengrass要访问您的资源Amazon服务。

      Deployments (部署) 页面显示了部署时间戳、版本 ID 和状态。完成后,部署的状态应显示为已完成.

      有关问题排查帮助,请参阅排除 Amazon IoT Greengrass 的故障

步骤 7:测试应用程序

这些区域有:TransferStreamLambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在Amazon Kinesis 控制台中,在Kinesis Data 流,选择MyKinesisStream.

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. 在存储库的MyKinesisStream页面上,选择监控. 如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 删除TransferStreamLambda 函数来自核心。

    1. 在Amazon IoT控制台导航窗格,在Manage,展开Greengrass 设备,然后选择组 (V1).

    2. UNDERGreengrass 组,选择您的组。

    3. 在存储库的Lambdas页面上,选择省略号 (...) 用于TransferStream函数,然后选择移除函数.

    4. 操作中,选择部署

要查看日志记录信息或解决流的问题,请检查日志中的 TransferStreamGGStreamManager 函数。您必须具有 root 权限以读取文件系统上的 Amazon IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果您需要更多疑难解答信息,您可以设置日志记录级别为了用户 Lambda 日志调日志然后再次部署该组。

另请参阅