将数据流导出到 AWS 云(CLI) - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到 AWS 云(CLI)

本教程将向您展示如何使用 AWS CLI 配置和部署 AWS IoT Greengrass 组 启用流管理器。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更容易和更可靠。在本教程中,您将创建一个 TransferStream Lambda 会消耗 IoT 数据。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core 开发工具包 中的 StreamManagerClient 对象与流管理器进行交互。为简单起见,Python Lambda 功能将生成模拟设备数据。

当您使用 AWS IoT Greengrass API,包括中的Greengrass命令 AWS CLI,要创建组,默认情况下会禁用流管理器。要在核心上启用流管理器,您需要创建一个函数定义版本,其中包括系统 GGStreamManager Lambda 函数和一个引用新的函数定义版本的组版本。然后部署组。

Prerequisites

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心 (v1.10 or later)。要了解如何创建 Greengrass 组和核心,请参阅 AWS IoT Greengrass 入门。“入门”教程还包含用于安装 AWS IoT Greengrass Core 软件的步骤。

    注意

    流管理器在上不受支持 OpenWrt 分配。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 如何下载和安装预构建的 OpenJDK 包裹 在 OpenJDK 文档。

  • 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0或更高版本。要在 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 中使用 StreamManagerClient,您必须:

    • 在核心设备上安装Python3.7或更高版本。

    • 在您的中包含SDK及其依赖项 Lambda 函数部署包。本教程中提供了说明。

    提示

    您可以使用 StreamManagerClient 使用Java或 NodeJS。有关代码示例,请参阅 AWS IoT Greengrass Core 开发工具包 用于JavaAWS IoT Greengrass Core 开发工具包 用于Node.js 于 GitHub.

  • 在与您的 Greengrass 组相同的 AWS 区域中,在 Amazon Kinesis Data Streams 中创建的名为 MyKinesisStream 的目标流。有关更多信息,请参阅 创建流Amazon Kinesis Developer Guide.

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您的 AWS 账户收取费用。有关定价的信息,请参阅 Kinesis Data Streams定价

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 一种 IAM 策略添加到 Greengrass 组角色 使 kinesis:PutRecords 对目标数据流进行操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws-cn:kinesis:区域:account-id:stream/MyKinesisStream" ] } ] }
  • 您的计算机上安装和配置的 AWS CLI。有关更多信息,请参阅 安装 AWS Command Line Interface配置 AWS CLIAWS Command Line Interface 用户指南.

     

    本教程中的示例命令是针对 Linux 及其他基于 Unix 的系统编写的。如果您使用的是 Windows,请参阅为 AWS 命令行界面指定参数值以了解语法上的差异。

    如果命令包含 JSON 字符串,本教程提供了在单行包含 JSON 的示例。在某些系统上,使用此格式可能会更容易地编辑和运行命令。

 

本教程包含以下概括步骤:

完成本教程大约需要 30 分钟。

第1步: 创建 Lambda 函数部署程序包

在此步骤中,您将创建一个 Lambda 包含Python功能代码和依赖项的功能部署包。您稍后在 AWS Lambda 中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 创建本地流并与之交互。

注意

您的用户定义 Lambda 函数必须使用 AWS IoT Greengrass Core 开发工具包 与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为的文件中 transfer_stream_python.zip。这是您的 Lambda 函数部署包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作 在1.5.0或更高版本中可用 适用于 Python 的 AWS IoT Greengrass Core 开发工具包.

    • 您为 适用于 Python 的 AWS IoT Greengrass Core 开发工具包(例如, cbor2 目录)安装的依赖项。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

第2步: 创建 Lambda 函数

  1. 创建 IAM 角色,以便您可以在创建该函数时传入角色 ARN。

    JSON 扩展
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com.cn" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com.cn"},"Action": "sts:AssumeRole"}]}'
    注意

    AWS IoT Greengrass 不使用此角色,因为对 Greengrass Lambda 函数的权限在 Greengrass 组角色中指定。对于本教程,您将创建一个空角色。

  2. 从输出中复制 Arn

  3. 使用 AWS Lambda API 创建 TransferStream 函数。以下命令假定该 zip 文件位于当前目录中。

    • Replace role-arnArn 您复制的。

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. 发布该函数的版本。

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. 为发布的版本创建别名。

    Greengrass 组可以按别名 (推荐) 或版本引用 Lambda 函数。通过使用别名,可以更轻松地管理代码更新,因为您在更新函数代码时无需更改订阅表或组定义。相反,您只是将别名指向新的函数版本。

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    注意

    AWS IoT Greengrass 不支持 $LATEST 版本的 Lambda 别名。

  6. 从输出中复制 AliasArn。为 AWS IoT Greengrass 配置函数时,可以使用此值。

现在,您已准备就绪,可以为 AWS IoT Greengrass 配置函数。

第3步: 创建函数定义和版本

此步骤创建引用系统的函数定义版本 GGStreamManager Lambda 函数和用户定义的 TransferStream Lambda 功能。要在使用时启用流管理器 AWS IoT Greengrass API,您的函数定义版本必须包含 GGStreamManager 功能。

  1. 使用包含系统和用户定义的初始版本创建函数定义 Lambda 功能。

    以下定义版本启用默认流管理器 参数设置. 要配置自定义设置,您必须为相应的流管理器参数定义环境变量。有关示例,请参阅 启用、禁用或配置流管理器 (CLI)。 AWS IoT Greengrass 对省略的参数使用默认设置。MemorySize 至少应该为 128000Pinned 必须设置为 true

    注意

    长时间生存(或固定)的 Lambda 函数在 AWS IoT Greengrass 启动后自动启动并在自己的容器中保持运行。这与按需 Lambda 函数相反,后者在调用时启动,并在没有要执行的任务时停止。有关更多信息,请参阅Greengrass Lambda 函数的生命周期配置

    • Replace arbitrary-function-id 函数的名称,例如 stream-manager.

    • Replace alias-arnAliasArn 所复制的 TransferStream Lambda 功能。

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws-cn:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws-cn:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    注意

    Timeout 是函数定义版本所需的,但 GGStreamManager 不使用它。有关 Timeout 和其他组级设置,请参阅 使用组特定的配置控制 Greengrass Lambda 函数的执行.

  2. 从输出中复制 LatestVersionArn。您将使用此值向部署到核心的组版本添加函数定义版本。

第4步: 创建记录器定义和版本

配置组的日志记录设置。在本教程中,您将配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数以及连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅利用 AWS IoT Greengrass 日志进行监控

  1. 创建一个包含初始版本的记录器定义。

    JSON 扩展
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. 从输出中复制记录器定义的 LatestVersionArn。您将使用此值向部署到核心的组版本添加记录器定义版本。

第5步: 获取核心定义版本的 ARN

获取要添加到新组版本的核心定义版本的 ARN。要部署组版本,该组版本必须引用包含确切一个核心的核心定义版本。

  1. 获取 IDs 目标Greengrass组和组版本的。此操作流程假设这是最新的组和组版本。以下查询 退货 最近创建的组。

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    或者,您也可以按名称查询。系统不要求组名称是唯一的,所以可能会返回多个组。

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    注意

    您还可以在 AWS IoT 控制台 中找到这些值。组 ID 显示在组的设置页面上。组版本 IDs 显示在分组的 部署 第页。

  2. 从输出中复制目标组的 Id。您将使用此值获取核心定义版本及部署组的时间。

  3. 从输出中复制 LatestVersion,这是添加到组的最后一个版本的 ID。您将使用此值获取核心定义版本。

  4. 获取核心定义版本的 ARN:

    1. 获取组版本。

      • Replace group-idId 您为组拷贝的。

      • Replace group-version-idLatestVersion 您为组拷贝的。

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. 从输出中复制 CoreDefinitionVersionArn。您将使用此值向部署到核心的组版本添加核心定义版本。

第6步: 创建组版本

现在,您已准备就绪,可以创建一个包含您要部署的实体的组版本。要实现此目的,需要创建一个引用每种组件类型的目标版本的组版本。在本教程中,您将包括核心定义版本、函数定义版本和记录器定义版本。

  1. 创建组版本。

    • Replace group-idId 您为组拷贝的。

    • Replace core-definition-version-arnCoreDefinitionVersionArn 为核心定义版本复制的。

    • Replace function-definition-version-arnLatestVersionArn 为新函数定义版本所复制的。

    • Replace logger-definition-version-arnLatestVersionArn 为新的logger定义版本复制的。

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. 从输出中复制 Version。这是新组版本的 ID。

第7步: 创建部署

将组部署到核心设备。

  1. 确保 AWS IoT Greengrass Core正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 AWS IoT Greengrass 核心软件版本。

    2. 要启动守护程序,请执行以下操作:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 创建部署。

    • Replace group-idId 您为组拷贝的。

    • Replace group-version-idVersion 为新组版本拷贝的。

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. 从输出中复制 DeploymentId

  4. 获取部署状态。

    • Replace group-idId 您为组拷贝的。

    • Replace deployment-idDeploymentId 为部署复制的。

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    如果状态为 Success,则部署成功。有关问题排查帮助,请参阅 问题排查AWS IoT Greengrass

第8步: 测试应用程序

TransferStream Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在 Amazon Kinesis 控制台,低于 Kinesis数据流,选择 MyKinesisStream.

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面,选择 监控. 如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 移除 TransferStream Lambda 功能。

    1. 按照第6步: 创建组版本创建新的组版本,但删除 create-group-version 命令中的 --function-definition-version-arn 选项。或者,创建一个不包含 TransferStream Lambda 功能。

      注意

      通过省略部署的组版本中的系统 GGStreamManager Lambda 函数,可以禁用核心上的流管理。

    2. 按照第7步: 创建部署以部署新的组版本。

要查看日志信息或排除流问题,请检查日志 TransferStreamGGStreamManager 功能。您必须具有 root 权限以读取文件系统上的 AWS IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/区域/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果需要更多疑难解答信息,可以将 Lambda 日志记录级别设置为 DEBUG,然后创建并部署新的组版本。

另请参阅