将数据流导出到 Amazon Web Services 云(控制台) - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon IoT Greengrass Version 1 2023 年 6 月 30 日进入延长寿命阶段。有关更多信息,请参阅 Amazon IoT Greengrass V1 维护策略。在此日期之后,将 Amazon IoT Greengrass V1 不会发布提供功能、增强功能、错误修复或安全补丁的更新。在上面运行的设备 Amazon IoT Greengrass V1 不会中断,将继续运行并连接到云端。我们强烈建议您迁移到 Amazon IoT Greengrass Version 2,这样可以添加重要的新功能支持其他平台

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到 Amazon Web Services 云(控制台)

本教程介绍如何使用 Amazon IoT 控制台配置和部署启用了流管理器的 Amazon IoT Greengrass 组。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 Amazon Web Services 云 中。

流管理器使得摄取、处理和导出大容量数据流更高效也更可靠。在本教程中,您将创建一个使用 IoT 数据的 TransferStream Lambda 函数。Lambda 函数使用 Amazon IoT Greengrass 核心开发工具包在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用 Amazon IoT Greengrass 核心开发工具包中的 StreamManagerClient 对象与流管理器进行交互。为简单起见,您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

先决条件

要完成此教程,需要:

  • Greengrass 组和 Greengrass Core(v1.10 或更高版本)。有关如何创建 Greengrass 组和核心的信息,请参阅 开始使用 Amazon IoT Greengrass。“入门”教程还包含用于安装 Amazon IoT Greengrass Core 软件的步骤。

    注意

    OpenWRT 发行版不支持流管理器。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包

  • 适用于 Python 的 Amazon IoT Greengrass Core 软件开发工具包 v1.5.0 或更高版本。要在适用于 Python 的 Amazon IoT Greengrass Core 软件开发工具包中使用 StreamManagerClient,您必须:

    • 在核心设备上安装 Python 3.7 或更高版本。

    • 将开发工具包和其依赖项包含在 Lambda 函数部署程序包中。本教程中提供了说明。

    提示

    可以将 StreamManagerClient 与 Java 或 NodeJS 结合使用。有关示例代码,请参阅 GitHub 上的适用于 Java 的 Amazon IoT Greengrass适用于 Node.js 的 Amazon IoT Greengrass 核心开发工具包

  • 在与您的 Greengrass 组相同的 Amazon Web Services 区域 中,在 Amazon Kinesis Data Streams 中创建的名为 MyKinesisStream 的目标流。有关更多信息,请参阅 Amazon Kinesis 开发人员指南中的创建流

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您的 Amazon Web Services 账户 账户收取费用。有关定价的信息,请参阅 Kinesis Data Streams 定价

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 一个添加到了 kinesis:PutRecords 的 IAM 策略,该策略允许对目标数据流执行 Greengrass 组角色 操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

本教程包含以下概括步骤:

完成本教程大约需要 20 分钟。

步骤 1:创建 Lambda 函数部署程序包

在此步骤中,您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在 Amazon Lambda 中创建 Lambda 函数时上传此程序包。Lambda 函数使用 Amazon IoT Greengrass 核心开发工具包创建本地流并与之交互。

注意

用户定义的 Lambda 函数必须使用 Amazon IoT Greengrass 核心开发工具包与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载适用于 Python 的 Amazon IoT Greengrass Core 软件开发工具包 v1.5.0 或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    提示
    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为 transfer_stream_python.zip 的文件中。此即 Lambda 函数部署程序包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作在适用于 Python 的 Amazon IoT Greengrass Core 软件开发工具包 v1.5.0 或更高版本中可用。

    • 您为适用于 Python 的 Amazon IoT Greengrass 核心开发工具包(例如,cbor2 目录)安装的依赖项。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

步骤 2:创建 Lambda 函数

在此步骤中,您使用 Amazon Lambda 控制台创建 Lambda 函数,然后将其配置为使用您的部署包。接着,发布函数版本并创建别名。

  1. 首先,创建 Lambda 函数。

    1. 在Amazon Web Services Management Console中,选择服务,然后打开 Amazon Lambda 控制台。

    2. 选择创建函数,然后选择从头开始创作

    3. 基本信息部分中,使用以下值:

      • 对于函数名称,请输入 TransferStream

      • 对于运行时系统,选择 Python 3.7

      • 对于权限,请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。此角色不由 Amazon IoT Greengrass 使用。

    4. 在页面底部,选择创建函数

  2. 接下来,注册处理程序并上传您的 Lambda 函数部署包。

    1. 代码选项卡上的代码源下,选择上传自。从下拉列表中选择 .zip 文件

      
                “上传自”下拉列表中突出显示了.zip 文件。
    2. 选择上传,然后选择您的 transfer_stream_python.zip 部署包。然后,选择保存

    3. 在函数的代码选项卡中,在运行时设置下选择编辑,然后输入以下值。

      • 对于运行时系统,选择 Python 3.7

      • 对于处理程序,输入 transfer_stream.function_handler

    4. 选择 Save(保存)。

      注意

      Amazon Lambda 控制台上的测试按钮不适用于此功能。Amazon IoT Greengrass Core 软件开发工具包不包含在 Amazon Lambda 控制台中独立运行 Greengrass Lambda 函数所需的模块。这些模块(例如 greengrass_common)是在函数部署到您的 Greengrass 核心之后提供给它们的。

  3. 现在,发布 Lambda 函数的第一个版本并创建此版本的别名

    注意

    Greengrass 组可以按别名(推荐)或版本引用 Lambda 函数。使用别名,您可以更轻松地管理代码更新,因为您在更新函数代码时,不必更改订阅表或组定义。相反,您只需将别名指向新的函数版本。

    1. 操作菜单上,选择发布新版本

    2. 对于版本描述,输入 First version,然后选择发布

    3. TransferStream: 1 (TransferStream:1) 配置页面上,从 Actions (操作) 菜单中选择 Create alias (创建别名)

    4. 创建新别名页面上,使用以下值:

      • 对于名称,输入 GG_TransferStream

      • 对于版本,选择 1

      注意

      Amazon IoT Greengrass 不支持 $LATEST 版本的 Lambda 别名。

    5. 选择 Create(创建)。

现在,您已准备就绪,可以将 Lambda 函数添加到 Greengrass 组。

步骤 3:将 Lambda 函数添加到 Greengrass 组

在该步骤中,您将 Lambda 函数添加到该组,然后配置其生命周期和环境变量。有关更多信息,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行

  1. 在 Amazon IoT 控制台导航窗格的管理下,展开 Greengrass 设备,然后选择组 (V1)

  2. 选择目标组。

  3. 在组配置页面上,选择Lambda 函数选项卡。

  4. 我的 Lambda 函数部分下,选择添加

  5. 添加 Lambda 函数页面上,为您的 Lambda 函数选择 Lambda 函数

  6. Lambda 版本中,选择 Alias:GG_TransferStream

    现在,配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

  7. Lambda 函数配置部分中,进行以下更改。

    • Memory limit (内存限制) 设置为 32 MB。

    • 对于已固定,选择 True

    注意

    长时间生存(或固定)的 Lambda 函数在 Amazon IoT Greengrass 启动后自动启动并在自己的容器中保持运行。这与按需 Lambda 函数相反,后者在调用时启动,并在没有要运行的任务时停止。有关更多信息,请参阅Greengrass Lambda 函数的生命周期配置

  8. 选择添加 Lambda 函数

步骤 4:启用流管理器

在此步骤中,您确保启用流管理器。

  1. 在组配置页面上,选择Lambda 函数选项卡。

  2. 系统 Lambda 函数下,选择流管理器,然后检查状态。如果禁用,请选择 Edit (编辑)。然后,选择 Enable (启用)Save (保存)。您可以对本教程使用默认参数设置。有关更多信息,请参阅配置 Amazon IoT Greengrass 流管理器

注意

使用控制台启用流管理器并部署组时,流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

步骤 5:配置本地日志记录

在此步骤中,您将配置 Amazon IoT Greengrass 系统组件、用户定义的 Lambda 函数以及组中的连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅利用 Amazon IoT Greengrass 日志进行监控

  1. Local logs configuration (本地日志配置) 下,检查是否配置了本地日志记录。

  2. 如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志,请选择编辑

  3. 选择用户 Lambda 函数日志级别Greengrass 系统日志级别

  4. 保留日志记录级别和磁盘空间限制的默认值,然后选择 Save (保存)

步骤 6:部署 Greengrass 组

将组部署到核心设备。

  1. 确保 Amazon IoT Greengrass 核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查进程守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示进程守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 Amazon IoT Greengrass Core 软件版本。

    2. 启动进程守护程序:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 在组配置页面上,选择部署

    1. Lambda 函数选项卡的系统 Lambda 函数部分下,选择 IP 检测器,再选择编辑

    2. 编辑 IP 检测器设置对话框中,选择自动检测和覆盖 MQTT 代理端点

    3. 选择 Save(保存)。

      这使得设备可以自动获取核心的连接信息,例如 IP 地址、DNS 和端口号。建议使用自动检测,不过 Amazon IoT Greengrass 也支持手动指定的端点。只有在首次部署组时,系统才会提示您选择发现方法。

      注意

      如果出现提示,请授予权限,以创建 Greengrass 服务角色并将其关联至当前 Amazon Web Services 区域中的 Amazon Web Services 账户。该角色将允许 Amazon IoT Greengrass 访问您在 Amazon 服务中的资源。

      部署页面显示了部署时间戳、版本 ID 和状态。完成后,部署的状态应显示为成功完成

      有关问题排查帮助,请参阅故障排除 Amazon IoT Greengrass

步骤 7:测试应用程序

TransferStream Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在 Amazon Kinesis 控制台的 Kinesis data streams 下,选择 MyKinesisStream

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面上,选择 Monitoring (监控)。如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 从核心中删除 TransferStream Lambda 函数。

    1. 在 Amazon IoT 控制台导航窗格的管理下,展开 Greengrass 设备,然后选择组 (V1)

    2. Greengrass 组下,选择您的组。

    3. Lambdas 页面上,为 TransferStream 函数选择省略号 (),然后选择删除函数

    4. 操作中,选择部署

要查看日志记录信息或解决流的问题,请检查日志中的 TransferStreamGGStreamManager 函数。您必须具有 root 权限以读取文件系统上的 Amazon IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果您需要更多故障排除信息,可以将用户 Lambda 日志日志级别设置为调试日志,然后再次部署该组。

另请参阅