将数据流导出到 AWS 云(控制台) - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到 AWS 云(控制台)

本教程介绍如何使用 AWS IoT 控制台配置和部署启用了流管理器的 AWS IoT Greengrass 组。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更容易和更可靠。在本教程中,您将创建一个 TransferStream Lambda 会消耗 IoT 数据。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core 开发工具包 中的 StreamManagerClient 对象与流管理器进行交互。为简单起见,Python Lambda 功能将生成模拟设备数据。

Prerequisites

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心 (v1.10 or later)。要了解如何创建 Greengrass 组和核心,请参阅 AWS IoT Greengrass 入门。“入门”教程还包含用于安装 AWS IoT Greengrass Core 软件的步骤。

    注意

    流管理器在上不受支持 OpenWrt 分配。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 如何下载和安装预构建的 OpenJDK 包裹 在 OpenJDK 文档。

  • 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0或更高版本。要在 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 中使用 StreamManagerClient,您必须:

    • 在核心设备上安装Python3.7或更高版本。

    • 在您的中包含SDK及其依赖项 Lambda 函数部署包。本教程中提供了说明。

    提示

    您可以使用 StreamManagerClient 使用Java或 NodeJS。有关代码示例,请参阅 AWS IoT Greengrass Core 开发工具包 用于JavaAWS IoT Greengrass Core 开发工具包 用于Node.js 于 GitHub.

  • 在与您的 Greengrass 组相同的 AWS 区域中,在 Amazon Kinesis Data Streams 中创建的名为 MyKinesisStream 的目标流。有关更多信息,请参阅 创建流Amazon Kinesis Developer Guide.

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您的 AWS 账户收取费用。有关定价的信息,请参阅 Kinesis Data Streams定价

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 一种 IAM 策略添加到 Greengrass 组角色 使 kinesis:PutRecords 对目标数据流进行操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws-cn:kinesis:区域:account-id:stream/MyKinesisStream" ] } ] }

本教程包含以下概括步骤:

完成本教程大约需要 20 分钟。

步骤 1: 创建 Lambda 函数部署程序包

在此步骤中,您将创建一个 Lambda 包含Python功能代码和依赖项的功能部署包。您稍后在 AWS Lambda 中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 创建本地流并与之交互。

注意

您的用户定义 Lambda 函数必须使用 AWS IoT Greengrass Core 开发工具包 与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为的文件中 transfer_stream_python.zip。这是您的 Lambda 函数部署包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作 在1.5.0或更高版本中可用 适用于 Python 的 AWS IoT Greengrass Core 开发工具包.

    • 您为 适用于 Python 的 AWS IoT Greengrass Core 开发工具包(例如, cbor2 目录)安装的依赖项。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

步骤 2: 创建 Lambda 函数

在该步骤中,您使用 AWS Lambda 控制台创建 Lambda 函数,然后将其配置为使用您的部署程序包。接着,发布函数版本并创建别名。

  1. 首先,创建 Lambda 函数。

    1. 在 AWS 管理控制台中,选择 Services (服务),然后打开 AWS Lambda 控制台。

    2. 选择 Create function 然后选择 Author from scratch.

    3. Basic information (基本信息) 部分中,使用以下值:

      • 对于 Function name (函数名称),输入 TransferStream

      • 对于 Runtime (运行时),选择 Python 3.7

      • 对于权限,请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。(此角色未由 AWS IoT Greengrass 使用。)

    4. 在页面底部,选择 Create function.

  2. 接下来,注册处理程序并上传您的 Lambda 函数部署程序包。

    1. TransferStream 函数的 Configuration (配置) 选项卡上,在 Function code (函数代码) 中,使用以下值:

      • 对于 代码输入种类,选择上传 .zip 文件

      • 对于 Runtime (运行时),选择 Python 3.7

      • 对于 Handler (处理程序),输入 transfer_stream.function_handler

    2. 选择 Upload.

    3. 选择您的 transfer_stream_python.zip 部署程序包。

    4. 选择 Save (保存)

      注意

      AWS Lambda 控制台上的测试键不可与该函数一同使用。AWS IoT Greengrass 核心 SDK 不包含在 AWS Lambda 控制台中独立运行 Greengrass Lambda 函数所需的模块。这些模块(例如 greengrass_common)会在被部署到您的 Greengrass 核心之后提供给这些函数。

  3. 现在,发布 Lambda 函数的第一个版本并创建此版本的别名

    注意

    Greengrass 组可以按别名 (推荐) 或版本引用 Lambda 函数。通过使用别名,可以更轻松地管理代码更新,因为您在更新函数代码时无需更改订阅表或组定义。相反,您只是将别名指向新的函数版本。

    1. Actions 菜单上,选择 Publish new version

    2. 对于 Version description (版本描述),输入 First version,然后选择 Publish (发布)

    3. TransferStream:1 配置页面上,从 Actions (操作) 菜单中选择 Create alias (创建别名)

    4. 创建新别名页面上,使用以下值:

      • 对于 Name (名称),请输入 GG_TransferStream

      • 对于 Version (版本),选择 1

      注意

      AWS IoT Greengrass 不支持 $LATEST 版本的 Lambda 别名。

    5. 选择创建

现在,您已准备就绪,可以将 Lambda 函数添加到 Greengrass 组。

步骤 3: 将 Lambda 函数添加到 Greengrass 组

在该步骤中,您将 Lambda 函数添加到该组,然后配置其生命周期和环境变量。有关更多信息,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行

  1. 在 AWS IoT 控制台中,选择 Greengrass,然后选择 Groups (组)

  2. 选择目标组。

  3. 在组配置页面上,选择 朗格斯,然后选择 Add Lambda.

    
                突出显示“Lambdas”和“Add Lambda (添加 Lambda)”的组页面。
  4. 将Lambda添加到您的Greengrass组 页面,选择 Use existing Lambda.

    
                突出显示“Use existing Lambda (使用现有 Lambda)”的“Add a Lambda to your Greengrass Group (将 Lambda 添加到 Greengrass 组)”页面。
  5. Use existing Lambda 页面上,选择 TransferStream,然后选择 Next.

  6. Select a Lambda version (选择 Lambda 版本) 页面上,选择 Alias:GG_TransferStream,然后选择 Finish.

    现在,配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

  7. 对于 TransferStream Lambda 函数,选择省略号 (),然后选择 Edit Configuration (编辑配置)

  8. 组特定的 Lambda 配置页面上,进行以下更改:

    • Memory limit (内存限制) 设置为 32 MB。

    • 对于 Lambda 生命周期,选择 Make this function long-lived and keep it running indefinitely (使此函数长时间生存,保持其无限期运行)

    注意

    长时间生存(或固定)的 Lambda 函数在 AWS IoT Greengrass 启动后自动启动并在自己的容器中保持运行。这与按需 Lambda 函数相反,后者在调用时启动,并在没有要执行的任务时停止。有关更多信息,请参阅Greengrass Lambda 函数的生命周期配置

  9. 选择 Update (更新)

步骤 4: 启用流管理器

在此步骤中,您确保启用流管理器。

  1. 在组配置页面上,选择 Settings (设置)。

    
                组设置页面。
  2. Stream manager (流管理器) 下,检查启用或禁用状态。如果禁用,请选择 Edit (编辑)。然后,选择 Enable (启用)Save (保存)。您可以在本教程中使用默认参数设置。有关更多信息,请参阅配置 AWS IoT Greengrass 流管理器

    
      组的“设置”页面上的“流管理器”部分。
注意

当您使用控制台启用流管理器并部署组时,默认情况下,流管理器的内存大小设置为 4194304 KB (4 GB)。我们建议您将内存大小设置为至少 128000 KB。

步骤 5: 配置本地日志记录

在此步骤中,您将配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数以及组中的连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅利用 AWS IoT Greengrass 日志进行监控

  1. Local logs configuration (本地日志配置) 下,检查是否配置了本地日志记录。

    
                显示Greengrass系统日志和用户的日志配置部分 Lambda 记录配置。
  2. 如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志,请选择编辑

  3. 选择添加其他日志类型,选择 User Lambdas (用户 Lambdas)Greengrass system (Greengrass 系统),然后选择 Update (更新)

  4. 保留日志记录级别和磁盘空间限制的默认值,然后选择 Save (保存)

步骤 6: 部署 Greengrass 组

将组部署到核心设备。

  1. 确保 AWS IoT Greengrass Core正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 AWS IoT Greengrass 核心软件版本。

    2. 要启动守护程序,请执行以下操作:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 在组配置页面上,选择部署,然后从操作菜单中,选择部署

    
                突出显示“Deployments (部署)”和“Deploy (部署)”的组页面。
  3. 如果提示,在 配置设备如何发现您的核心 页面,选择 Automatic detection.

    这使得设备可以自动获取核心的连接信息,例如 IP 地址、DNS 和端口号。建议使用自动检测,不过 AWS IoT Greengrass 也支持手动指定的终端节点。只有在首次部署组时,系统才会提示您选择搜索方法。

    
                突出显示“Automatic detection (自动检测)”的“Configure how devices discover your core (配置设备搜索您的核心的方式)”页面。
    注意

    在系统提示时,授予权限以创建Greengrass 服务角色,并将该角色与您在当前 AWS 区域中的 AWS 账户关联。该角色允许 AWS IoT Greengrass 访问您 AWS 服务中的资源。

    Deployments (部署) 页面显示了部署时间戳、版本 ID 和状态。完成后,部署的状态应显示为 Successfully completed (成功完成)

    有关问题排查帮助,请参阅 问题排查AWS IoT Greengrass

步骤 7: 测试应用程序

TransferStream Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在 Amazon Kinesis 控制台,低于 Kinesis数据流,选择 MyKinesisStream.

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面,选择 监控. 如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 从核心中删除 TransferStream Lambda 函数。

    1. 在 AWS IoT 控制台中,依次选择 Greengrass,然后选择您的组。

    2. Lambda 页面上,选择 函数的省略号 (TransferStream...),然后选择 Remove function (删除函数)

    3. 操作中,选择部署

要查看日志记录信息或排查流的问题,请查看 TransferStreamGGStreamManager 函数的日志。您必须具有 root 权限以读取文件系统上的 AWS IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/区域/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果您需要更多故障排除信息,可以将用户 日志的Lambda日志记录级别设置为调试日志,然后再次部署组。

另请参阅