将数据流导出到 Amazon Web Services 云 (console) - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

您正在查看Amazon IoT Greengrass Version 1.Amazon IoT Greengrass Version 2是最新的主要版本Amazon IoT Greengrass. 有关使用Amazon IoT Greengrass V2,请参阅Amazon IoT Greengrass Version 2开发人员指南.

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到 Amazon Web Services 云 (console)

本教程介绍如何使用Amazon IoT控制台配置和部署Amazon IoT Greengrass组,并启用了流管理器。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 Amazon Web Services 云 .

流管理器使得摄取、处理和导出大容量数据流更有效和可靠。在本教程中,您将创建一个TransferStream使用 IoT 数据的 Lambda 函数。Lambda 函数使用Amazon IoT Greengrass在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用StreamManagerClient对象Amazon IoT Greengrass与流管理器交互的核心 SDK。为简单起见,您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

Prerequisites

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心(v1.10 或更高版本)。有关如何创建 Greengrass 组和核心的信息,请参阅开始使用 Amazon IoT Greengrass. “入门”教程还包含用于安装 Amazon IoT Greengrass Core 软件的步骤。

    注意

    OpenWRT 发行版不支持流管理器。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包

  • Amazon IoT Greengrass适用于 Python v1.5.0 或更高版本的核心开发工具包。使用StreamManagerClient中的Amazon IoT Greengrass适用于 Python 的核心软件开发工具包,您必须:

    • 在核心设备上安装 Python 3.7 或更高版本。

    • 在 Lambda 函数部署包中包含 SDK 及其依赖项。本教程中提供了说明。

    提示

    可以将 StreamManagerClient 与 Java 或 NodeJS 结合使用。例如,请参阅 Amazon IoT Greengrass适用于 Java 的核心开发工具包Amazon IoT Greengrass适用于 Node.js 的核心开发工具包(位于 GitHub 上)。

  • 名为MyKinesisStream在 Amazon Kinesis Data Streams 中创建的 Amazon Web Services 区域 作为您的 Greengrass 组。有关更多信息,请参阅 。创建流中的Amazon Kinesis 开发人员指南.

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向 Amazon Web Services 账户 . 有关定价的信息,请参阅Kinesis Data Streams 定价.

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 添加到Greengrass 组角色允许kinesis:PutRecords操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }

本教程包含以下概括步骤:

完成本教程大约需要 20 分钟。

第 1 步:创建 Lambda 函数部署软件包

在此步骤中,您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在创建中的 Lambda 函数Amazon Lambda. Lambda 函数使用Amazon IoT Greengrass创建本地流并与之交互的核心 SDK。

注意

您的用户定义的 Lambda 函数必须使用Amazon IoT Greengrass核心开发工具包与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载Amazon IoT Greengrass适用于 Python 的核心开发工具v1.5.0 或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    提示

    有关使用 Java 和 NodeJS 的示例代码,请参阅 Amazon IoT Greengrass适用于 Java 的核心开发工具包Amazon IoT Greengrass适用于 Node.js 的核心开发工具包(位于 GitHub 上)。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为 transfer_stream_python.zip 的文件中。此即 Lambda 函数部署程序包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作是 1.5.0 版本或更高版本。Amazon IoT Greengrass适用于 Python 的核心开发工具包。

    • 您为Amazon IoT Greengrass适用于 Python 的核心开发工具包(例如,cbor2)。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

第 2 步:创建 Lambda 函数

在此步骤中,您将使用Amazon Lambda控制台创建 Lambda 函数,然后将其配置为使用您的部署程序包。接着,发布函数版本并创建别名。

  1. 首先,创建 Lambda 函数。

    1. 在 Amazon Web Services Management Console中,选择 Services (服务),然后打开 Amazon Lambda 控制台。

    2. 选择创建函数,然后选择从头开始创作.

    3. Basic information (基本信息) 部分中,使用以下值:

      • 对于 Function name(函数名称),请输入 TransferStream

      • 对于 Runtime (运行时),选择 Python 3.7

      • 适用于Permissions (权限),请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。不使用该角色Amazon IoT Greengrass.

    4. 在页面底部,选择创建函数

  2. 接下来,注册处理程序并上传您的 Lambda 函数部署程序包。

    1. 在存储库的代码选项卡,在代码源中,选择上传自. 从下拉列表中,选择.zip 文件.

      
                突出显示 .zip 文件的 “从上传” 下拉菜单。
    2. 选择上传,然后选择您的transfer_stream_python.zip部署程序包。然后,选择 Save (保存)

    3. 在存储库的代码选项卡,在Runtime 设置中,选择编辑,然后输入以下值。

      • 对于 Runtime (运行时),选择 Python 3.7

      • 对于 Handler (处理程序),输入 transfer_stream.function_handler

    4. 选择 Save

      注意

      这些区域有:测试按钮Amazon Lambda控制台与此函数一起使用时不起作用。这些区域有:Amazon IoT Greengrass核心 SDK 不包含独立运行 Greengrass Lambda 函数所需的模块Amazon Lambda控制台。这些模块(例如greengrass_common)在这些函数部署到您的 Greengrass 核心后提供给这些函数。

  3. 现在,发布您的 Lambda 函数的第一个版本并创建一个版本的别名.

    注意

    Greengrass 组可以按别名(推荐)或版本引用 Lambda 函数。使用别名,您可以更轻松地管理代码更新,因为您在更新函数代码时无需更改订阅表或组定义。相反,您只需将别名指向新函数版本。

    1. Actions 菜单上,选择 Publish new version

    2. 对于 Version description (版本描述),输入 First version,然后选择 Publish (发布)

    3. TransferStream: 1 (TransferStream:1) 配置页面上,从 Actions (操作) 菜单中选择 Create alias (创建别名)

    4. 创建新别名页面上,使用以下值:

      • 对于名称,请输入 GG_TransferStream

      • 对于 Version (版本),选择 1

      注意

      Amazon IoT Greengrass不支持 Lambda 别名$LBEAD版本。

    5. 选择创建

现在,您已准备就绪,可以将 Lambda 函数添加到 Greengrass 组。

第 3 步:将 Lambda 函数添加到 Greengrass 组

在该步骤中,您将 Lambda 函数添加到该组,然后配置其生命周期和环境变量。有关更多信息,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行

  1. 在Amazon IoT控制台的导航窗格中,选择Greengrass经典 (V1)Groups.

  2. 选择目标组。

  3. 在组配置页面上,选择 Lambdas,然后选择添加 Lambda

    
                突出显示“Lambdas”和“Add Lambda (添加 Lambda)”的组页面。
  4. 将 Lambda 添加到 Greengrass 组页面上,选择使用现有 Lambda

    
                突出显示“Use existing Lambda (使用现有 Lambda)”的“Add a Lambda to your Greengrass Group (将 Lambda 添加到 Greengrass 组)”页面。
  5. 在存储库的使用现有的 Lambda页面上,选择TransferStream,然后选择下一步.

  6. 在存储库的选择一个 Lambda 版本页面上,选择别名:GG_ 传输流,然后选择Finish.

    现在,配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

  7. 对于TransferStreamLambda 函数中,选择省略号 (...),然后选择编辑配置.

  8. 组特定的 Lambda 配置页面上,进行以下更改:

    • Memory limit (内存限制) 设置为 32 MB。

    • 对于 Lambda 生命周期,选择 Make this function long-lived and keep it running indefinitely (使此函数长时间生存,保持其无限期运行)

    注意

    A长时间生存的(或pinned)Lambda 函数在Amazon IoT Greengrass启动并在自己的容器中继续运行。这与按需Lambda 函数,该函数在调用时启动,并在没有要运行的任务时停止。有关更多信息,请参阅Greengrass Lambda 函数的生命周期配置

  9. 选择 Update

第 4 步:启用流管理器

在此步骤中,您确保启用流管理器。

  1. 在组配置页面上,选择 Settings (设置)

    
                组设置页面。
  2. Stream manager (流管理器) 下,检查启用或禁用状态。如果禁用,请选择 Edit (编辑)。然后,选择 Enable (启用)Save (保存)。您可以对本教程使用默认参数设置。有关更多信息,请参阅配置 Amazon IoT Greengrass 流管理器

    
                        组的“设置”页面上的“流管理器”部分。
注意

使用控制台启用流管理器并部署组时,流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

第 5 步:配置本地日志记录

在此步骤中,您将配置Amazon IoT Greengrass系统组件、用户定义的 Lambda 函数以及组中的连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅利用 Amazon IoT Greengrass 日志进行监控

  1. Local logs configuration (本地日志配置) 下,检查是否配置了本地日志记录。

    
                “日志配置” 部分,显示 Greengrass 系统日志和用户 Lambda 日志配置。
  2. 如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志,请选择编辑.

  3. 选择添加其他日志类型,选择 User Lambdas (用户 Lambdas)Greengrass system (Greengrass 系统),然后选择 Update (更新)

  4. 保留日志记录级别和磁盘空间限制的默认值,然后选择 Save (保存)

第 6 步:部署 Greengrass 组

将组部署到核心设备。

  1. 确保已将Amazon IoT Greengrass核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 Amazon IoT Greengrass 核心软件版本。

    2. 启动守护程序:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 在组配置页面上,选择部署,然后从操作菜单中,选择部署

    
                突出显示“Deployments (部署)”和“Deploy (部署)”的组页面。
  3. 如果出现提示,请在配置设备搜索您的核心的方式页面上,选择自动检测.

    这使得设备可以自动获取核心的连接信息,例如 IP 地址、DNS 和端口号。建议使用自动检测,不过 Amazon IoT Greengrass 也支持手动指定的终端节点。只有在首次部署组时,系统才会提示您选择发现方法。

    
                突出显示“Automatic detection (自动检测)”的“Configure how devices discover your core (配置设备搜索您的核心的方式)”页面。
    注意

    如果出现提示,请授予创建Greengrass 服务角色并将其与 Amazon Web Services 账户 在最新的 Amazon Web Services 区域 . 此角色允许Amazon IoT Greengrass访问您的资源Amazon服务。

    Deployments (部署) 页面显示了部署时间戳、版本 ID 和状态。完成后,部署的状态应显示为 Successfully completed (成功完成)

    有关问题排查帮助,请参阅Amazon IoT Greengrass 故障排除

步骤 7:测试应用程序

这些区域有:TransferStreamLambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在亚 Amazon Kinesis 控制台中,在Kinesis Data Streams中,选择MyKinesisStream.

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面上,选择 Monitoring (监控)。如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 删除TransferStreamLambda 函数从核心。

    1. 在Amazon IoT控制台的导航窗格中,选择Greengrass经典 (V1)Groups.

    2. UNASOGreengrass 组,选择您的组。

    3. 在存储库的Lambda页面上,选择椭圆(...),用于TransferStream函数,然后选择删除功能.

    4. 操作中,选择部署

要查看日志记录信息或解决流的问题,请检查日志中的 TransferStreamGGStreamManager 函数。您必须具有 root 权限以读取文件系统上的 Amazon IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果您需要更多故障排除信息,可以设置日志记录级别对于 来说为Lambda 日志调试日志,然后再次部署该组。

另请参阅