将数据流导出到 Amazon Web Services 云 (CLI) - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

您正在查看Amazon IoT Greengrass Version 1.Amazon IoT Greengrass Version 2是最新的主要版本Amazon IoT Greengrass. 有关使用Amazon IoT Greengrass V2,请参阅Amazon IoT Greengrass Version 2开发人员指南.

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据流导出到 Amazon Web Services 云 (CLI)

本教程介绍如何使用Amazon CLI配置和部署Amazon IoT Greengrass组,并启用了流管理器。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 Amazon Web Services 云 .

流管理器使得摄取、处理和导出大容量数据流更有效和可靠。在本教程中,您将创建一个TransferStream使用 IoT 数据的 Lambda 函数。Lambda 函数使用Amazon IoT Greengrass在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用StreamManagerClient对象Amazon IoT Greengrass与流管理器交互的核心 SDK。为简单起见,您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

当您使用Amazon IoT GreengrassAPI,其中包括Amazon CLI,则默认情况下,流管理器处于禁用状态。要在您的核心上启用流管理器,创建函数定义版本其中包括系统GGStreamManagerLambda 函数和一个引用新的函数定义版本的组版本。然后部署组。

Prerequisites

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心(v1.10 或更高版本)。有关如何创建 Greengrass 组和核心的信息,请参阅开始使用 Amazon IoT Greengrass. “入门”教程还包含用于安装 Amazon IoT Greengrass Core 软件的步骤。

    注意

    OpenWRT 发行版不支持流管理器。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包

  • Amazon IoT Greengrass适用于 Python v1.5.0 或更高版本的核心开发工具包。使用StreamManagerClient中的Amazon IoT Greengrass适用于 Python 的核心软件开发工具包,您必须:

    • 在核心设备上安装 Python 3.7 或更高版本。

    • 在 Lambda 函数部署包中包含 SDK 及其依赖项。本教程中提供了说明。

    提示

    可以将 StreamManagerClient 与 Java 或 NodeJS 结合使用。例如,请参阅 Amazon IoT Greengrass适用于 Java 的核心开发工具包Amazon IoT Greengrass适用于 Node.js 的核心开发工具包(位于 GitHub 上)。

  • 名为MyKinesisStream在 Amazon Kinesis Data Streams 中创建的 Amazon Web Services 区域 作为您的 Greengrass 组。有关更多信息,请参阅 。创建流中的Amazon Kinesis 开发人员指南.

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向 Amazon Web Services 账户 . 有关定价的信息,请参阅Kinesis Data Streams 定价.

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 添加到Greengrass 组角色允许kinesis:PutRecords操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/MyKinesisStream" ] } ] }
  • 您的计算机上安装和配置的 Amazon CLI。有关更多信息,请参阅 。安装Amazon Command Line Interface配置Amazon CLI中的Amazon Command Line Interface用户指南.

     

    本教程中的示例命令是针对 Linux 及其他基于 Unix 的系统编写的。如果您使用的是 Windows,请参阅为 指定参数值Amazon命令行界面了解有关语法差异的更多信息。

    如果命令包含 JSON 字符串,本教程提供了在单行包含 JSON 的示例。在某些系统上,使用此格式可能会更有效地编辑和运行命令。

 

本教程包含以下概括步骤:

完成本教程大约需要 30 分钟。

第 1 步:创建 Lambda 函数部署软件包

在此步骤中,您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在创建中的 Lambda 函数Amazon Lambda. Lambda 函数使用Amazon IoT Greengrass创建本地流并与之交互的核心 SDK。

注意

您的用户定义的 Lambda 函数必须使用Amazon IoT Greengrass核心开发工具包与流管理器交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载Amazon IoT Greengrass适用于 Python 的核心开发工具v1.5.0 或更高版本。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    提示

    有关使用 Java 和 NodeJS 的示例代码,请参阅 Amazon IoT Greengrass适用于 Java 的核心开发工具包Amazon IoT Greengrass适用于 Node.js 的核心开发工具包(位于 GitHub 上)。

    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为 transfer_stream_python.zip 的文件中。此即 Lambda 函数部署程序包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作是 1.5.0 版本或更高版本。Amazon IoT Greengrass适用于 Python 的核心开发工具包。

    • 您为Amazon IoT Greengrass适用于 Python 的核心开发工具包(例如,cbor2)。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

第 2 步:创建 Lambda 函数

  1. 创建 IAM 角色,以便您可以在创建该函数时传入角色 ARN。

    JSON Expanded
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
    注意

    Amazon IoT Greengrass不使用此角色,因为对 Greengrass Lambda 函数的权限在 Greengrass 组角色中指定。对于本教程,您将创建一个空角色。

  2. 从输出中复制 Arn

  3. 使用 Amazon Lambda API 创建 TransferStream 函数。以下命令假定该 zip 文件位于当前目录中。

    • Replace角色 ARN使用Arn复制的。

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. 发布该函数的版本。

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. 为发布的版本创建别名。

    Greengrass 组可以按别名(推荐)或版本引用 Lambda 函数。使用别名,您可以更轻松地管理代码更新,因为您在更新函数代码时无需更改订阅表或组定义。相反,您只需将别名指向新函数版本。

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    注意

    Amazon IoT Greengrass不支持 Lambda 别名$LBEAD版本。

  6. 从输出中复制 AliasArn。为 Amazon IoT Greengrass 配置函数时,可以使用此值。

现在,您已准备就绪,可以为 Amazon IoT Greengrass 配置函数。

第 3 步:创建函数定义和版本

此步骤创建引用系统的函数定义版本GGStreamManagerLambda 函数和您的用户定义TransferStreamLambda 函数。若要启用流管理器,当您使用Amazon IoT GreengrassAPI,您的函数定义版本必须包含GGStreamManagerfunction.

  1. 使用包含系统和用户定义的 Lambda 函数的初始版本创建函数定义。

    以下定义版本使用默认设置启用流管理器参数设置. 要配置自定义设置,您必须为相应的流管理器参数定义环境变量。有关示例,请参阅 。启用、禁用或配置流管理器 (CLI).Amazon IoT Greengrass将为省略的参数使用默认设置。MemorySize均应至少为128000.Pinned必须将其设为true.

    注意

    A长时间生存的(或pinned)Lambda 函数在Amazon IoT Greengrass启动并在自己的容器中继续运行。这与按需Lambda 函数,该函数在调用时启动,并在没有要运行的任务时停止。有关更多信息,请参阅Greengrass Lambda 函数的生命周期配置

    • arbitrary-function-id 替换为函数名称,如 stream-manager

    • Replace别名 ARN使用AliasArn创建别名时复制的TransferStreamLambda 函数。

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    注意

    Timeout 是函数定义版本所需的,但 GGStreamManager 不使用它。有关 的更多信息Timeout和其他组级别设置,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行.

  2. 从输出中复制 LatestVersionArn。您将使用此值向部署到核心的组版本添加函数定义版本。

第 4 步:创建记录器定义和版本

配置组的日志记录设置。在本教程中,将Amazon IoT Greengrass系统组件、用户定义的 Lambda 函数以及连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅利用 Amazon IoT Greengrass 日志进行监控

  1. 创建一个包含初始版本的记录器定义。

    JSON Expanded
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. 从输出中复制记录器定义的 LatestVersionArn。您将使用此值向部署到核心的组版本添加记录器定义版本。

第 5 步:获取核心定义版本的 ARN

获取要添加到新组版本的核心定义版本的 ARN。要部署组版本,该组版本必须引用包含确切一个核心的核心定义版本。

  1. 获取目标 Greengrass 组和组版本的 ID。此过程假定这是最新的组和组版本。以下查询返回最近创建的组。

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    或者,您也可以按名称查询。系统不要求组名称是唯一的,所以可能会返回多个组。

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    注意

    您还可以在中找到这些值。Amazon IoT控制台。组 ID 显示在组的设置页面上。组版本 ID 显示在组的部署页面上。

  2. 从输出中复制目标组的 Id。您将使用此值获取核心定义版本及部署组的时间。

  3. 从输出中复制 LatestVersion,这是添加到组的最后一个版本的 ID。您将使用此值获取核心定义版本。

  4. 获取核心定义版本的 ARN:

    1. 获取组版本。

      • 使用为组复制的 替换 group-idId

      • Replace群组版本 ID使用LatestVersion的第一个版本。

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. 从输出中复制 CoreDefinitionVersionArn。您将使用此值向部署到核心的组版本添加核心定义版本。

第 6 步:创建组版本

现在,您已准备就绪,可以创建一个包含您要部署的实体的组版本。要实现此目的,需要创建一个引用每种组件类型的目标版本的组版本。在本教程中,您将包括核心定义版本、函数定义版本和记录器定义版本。

  1. 创建组版本。

    • 使用为组复制的 替换 group-idId

    • Replace内核定义版本 ARn使用CoreDefinitionVersionArn复制为核心定义版本。

    • function-definition-version-arn 替换为您为新函数定义版本复制的 LatestVersionArn

    • logger-definition-version-arn 替换为您为新的记录器定义版本复制的 LatestVersionArn

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. 从输出中复制 Version。这是新组版本的 ID。

步骤 7:创建部署

将组部署到核心设备。

  1. 确保已将Amazon IoT Greengrass核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 Amazon IoT Greengrass 核心软件版本。

    2. 启动守护程序:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 创建部署。

    • 使用为组复制的 替换 group-idId

    • 使用为新组版本复制的 替换 group-version-idVersion

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. 从输出中复制 DeploymentId

  4. 获取部署状态。

    • 使用为组复制的 替换 group-idId

    • Replace部署 ID使用DeploymentId为部署复制的。

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    如果状态是否为Success,则部署成功。有关问题排查帮助,请参阅Amazon IoT Greengrass 故障排除

步骤 8:测试应用程序

这些区域有:TransferStreamLambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在亚 Amazon Kinesis 控制台中,在Kinesis Data Streams中,选择MyKinesisStream.

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面上,选择 Monitoring (监控)。如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 删除TransferStreamLambda 函数从核心。

    1. 按照第 6 步:创建组版本创建新的组版本,但删除 create-group-version 命令中的 --function-definition-version-arn 选项。或者,创建不包括的函数定义版本。TransferStreamLambda 函数。

      注意

      通过省略系统GGStreamManagerLambda 函数从部署的组版本中,您可以禁用核心上的流管理。

    2. 按照步骤 7:创建部署以部署新的组版本。

要查看日志记录信息或解决流的问题,请检查日志中的 TransferStreamGGStreamManager 函数。您必须具有 root 权限以读取文件系统上的 Amazon IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果需要更多疑难解答信息,可以将 Lambda 日志记录级别设置为 DEBUG,然后创建并部署新的组版本。

另请参阅