将数据流导出到 AWS 云 (CLI) - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

将数据流导出到 AWS 云 (CLI)

本教程介绍如何使用 AWS CLI 创建和部署启用了流管理器的 AWS IoT Greengrass 组。该组包含一个用户定义的 Lambda 函数,该函数可以在流管理器中写入流,然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更容易和更可靠。在本教程中,您将创建一个使用 IoT 数据的 TransferStream Lambda 函数。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 在流管理器中创建流,然后对其进行读写。然后,流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。


      流管理工作流图。

本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core 开发工具包 中的 StreamManagerClient 对象与流管理器进行交互。为简单起见,您为本教程创建的 Lambda 函数将生成模拟设备数据。

当您使用 AWS IoT Greengrass API(在本教程中,Greengrass CLI 命令)创建组时,默认情况下会禁用流管理器。要在核心上启用流管理器,您需要创建一个函数定义版本,其中包括系统 GGStreamManager Lambda 函数和一个引用新的函数定义版本的组版本。然后部署组。

先决条件

要完成此教程,需要:

  • Greengrass 组和 Greengrass 核心 (v1.10 or later)。要了解如何创建 Greengrass 组和核心,请参阅 AWS IoT Greengrass 入门。“入门”教程还包含用于安装 AWS IoT Greengrass Core 软件的步骤。

    注意

    OpenWRT 发行版不支持流管理器。

  • 核心设备上安装的 Java 8 运行时 (JDK 8)。

    • 对于基于 Debian 的发行版(包括 Raspbian)或基于 Ubuntui 的发行版,运行以下命令:

      sudo apt install openjdk-8-jdk
    • 对于基于 Red Hat 的发行版(包括 Amazon Linux),请运行以下命令:

      sudo yum install java-1.8.0-openjdk

      有关更多信息,请参阅 OpenJDK 文档中的如何下载并安装预先构建的 OpenJDK 程序包

  • 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0。要在 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 中使用 StreamManagerClient,您必须:

    • 安装 Python 3.7 或更高版本。

    • 安装程序包依赖项并将其包含在 Lambda 函数部署程序包中。本教程中提供了说明。

    提示

    可以将 StreamManagerClient 与 Java 或 NodeJS 结合使用。有关示例代码,请参阅 GitHub 上的适用于 Java 的 AWS IoT Greengrass Core 开发工具包适用于 Node.js 的 AWS IoT Greengrass Core 开发工具包

  • 在与您的 Greengrass 组相同的 AWS 区域中,在 Amazon Kinesis Data Streams 中创建的名为 MyKinesisStream 的目标流。有关更多信息,请参阅 Amazon Kinesis Developer Guide 中的创建存储桶

    注意

    在本教程中,流管理器将数据导出到 Kinesis Data Streams,这将向您的 AWS 账户收取费用。有关定价的信息,请参阅 Kinesis Data Streams 定价

    为避免产生费用,您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下,您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。

  • 添加到 Greengrass 组角色的 IAM 策略,允许对目标数据流执行 kinesis:PutRecords 操作,如以下示例所示:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws-cn:kinesis:区域:account-id:stream/MyKinesisStream" ] } ] }

    有关更多信息,请参阅 Greengrass 组角色

  • 您的计算机上安装和配置的 AWS CLI。有关更多信息,请参阅 AWS Command Line Interface 用户指南 中的安装 AWS Command Line Interface配置 AWS CLI

     

    本教程中的示例命令是针对 Linux 及其他基于 Unix 的系统编写的。如果您使用的是 Windows,请参阅为 AWS 命令行界面指定参数值以了解语法上的差异。

    如果命令包含 JSON 字符串,本教程提供了在单行包含 JSON 的示例。在某些系统上,使用此格式可能会更容易地编辑和运行命令。

 

本教程包含以下概括步骤:

完成本教程大约需要 30 分钟。

步骤 1:创建 Lambda 函数部署程序包

在此步骤中,您创建包含函数代码和依赖项的 Lambda 函数部署包。您稍后在 AWS Lambda 中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass Core 开发工具包 创建本地流并与之交互。

注意

如果使用的是用户定义的 Lambda 函数,您必须使用 AWS IoT Greengrass Core 开发工具包 与流管理器进行交互。有关 Greengrass 流管理器的要求的更多信息,请参阅 Greengrass 流管理器要求

  1. 下载 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 v1.5.0。

  2. 解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 greengrasssdk 文件夹。

  3. 安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。

    1. 导航到包含该 requirements.txt 文件的开发工具包目录。此文件列出了依赖项。

    2. 安装开发工具包依赖项。例如,运行以下 pip 命令将它们安装在当前目录中:

      pip install --target . -r requirements.txt
  4. 将以下 Python 代码函数保存在名为 transfer_stream.py 的本地文件中。

    提示
    import asyncio import logging import random import time from greengrasssdk.stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ) # This example creates a local stream named "SomeStream". # It starts writing data into that stream and then stream manager automatically exports # the data to a customer-created Kinesis data stream named "MyKinesisStream". # This example runs forever until the program is stopped. # The size of the local stream on disk will not exceed the default (which is 256 MB). # Any data appended after the stream reaches the size limit continues to be appended, and # stream manager deletes the oldest data until the total stream size is back under 256 MB. # The Kinesis data stream in the cloud has no such bound, so all the data from this script is # uploaded to Kinesis and you will be charged for that usage. def main(logger): try: stream_name = "SomeStream" kinesis_stream_name = "MyKinesisStream" # Create a client for the StreamManager client = StreamManagerClient() # Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass exports = ExportDefinition( kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) ) # Append two messages and print their sequence numbers logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")), ) logger.info( "Successfully appended message to stream with sequence number %d", client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")), ) # Try reading the two messages we just appended and print them out logger.info( "Successfully read 2 messages: %s", client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)), ) logger.info("Now going to start writing random integers between 0 and 1000 to the stream") # Now start putting in random data between 0 and 1000 to emulate device sensor input while True: logger.debug("Appending new random integer to stream") client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big")) time.sleep(1) except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") def function_handler(event, context): return logging.basicConfig(level=logging.INFO) # Start up this sample code main(logger=logging.getLogger())
  5. 将以下项目压缩到名为 transfer_stream_python.zip 的文件中。此即 Lambda 函数部署程序包。

    • transfer_stream.py。应用程序逻辑。

    • greengrasssdk。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

      流管理器操作在 适用于 Python 的 AWS IoT Greengrass Core 开发工具包 的版本 1.5.0 中可用。

    • 您为 适用于 Python 的 AWS IoT Greengrass Core 开发工具包(例如, cbor2 目录)安装的依赖项。

    创建 zip 文件时,仅包含这些项目,而不是包含文件夹。

步骤 2:创建 Lambda 函数

  1. 创建 IAM 角色,以便您可以在创建该函数时传入角色 ARN。

    JSON 扩展
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com.cn" }, "Action": "sts:AssumeRole" } ] }'
    JSON Single-line
    aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com.cn"},"Action": "sts:AssumeRole"}]}'
    注意

    AWS IoT Greengrass 不使用此角色,因为对 Greengrass Lambda 函数的权限在 Greengrass 组角色中指定。对于本教程,您将创建一个空角色。

  2. 从输出中复制 Arn

  3. 使用 AWS Lambda API 创建 TransferStream 函数。以下命令假定该 zip 文件位于当前目录中。

    • 使用您复制的 Arn 替换 role-arn

    aws lambda create-function \ --function-name TransferStream \ --zip-file fileb://transfer_stream_python.zip \ --role role-arn \ --handler transfer_stream.function_handler \ --runtime python3.7
  4. 发布该函数的版本。

    aws lambda publish-version --function-name TransferStream --description 'First version'
  5. 为发布的版本创建别名。

    Greengrass 组可以按别名 (推荐) 或版本引用 Lambda 函数。通过使用别名,可以更轻松地管理代码更新,因为您在更新函数代码时无需更改订阅表或组定义。相反,您只是将别名指向新的函数版本。

    aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
    注意

    AWS IoT Greengrass 不支持 $LATEST 版本的 Lambda 别名。

  6. 从输出中复制 AliasArn。为 AWS IoT Greengrass 配置函数时,可以使用此值。

现在,您已准备就绪,可以为 AWS IoT Greengrass 配置函数。

步骤 3:创建函数定义和版本

在此步骤中,您将创建引用系统 GGStreamManagerLambda 函数的函数定义版本,以便在 AWS IoT Greengrass Core 上启用和配置流管理器。此过程中的示例使用默认流管理器。在本教程中,您还可以按别名引用 TransferStream Lambda 函数并定义组级别配置。有关更多信息,请参阅使用组特定的配置控制 Greengrass Lambda 函数的执行

  1. 创建一个包含初始版本的函数定义,该初始版本包含系统和用户定义的 Lambda 函数。

    以下示例使用默认流管理器设置。要配置自定义设置,请为相应的流管理器参数定义环境变量。有关示例,请参阅启用、禁用或配置流管理器设置 (CLI)。AWS IoT Greengrass 对省略的参数使用默认值。 MemorySize 应该至少是 128000Pinned 必须设置为 true

    注意

    长时间生存(或固定)的 Lambda 函数在 AWS IoT Greengrass 启动后自动启动并在自己的容器中保持运行。这与按需 Lambda 函数相反,后者在调用时启动,并在没有要执行的任务时停止。有关更多信息,请参阅 Greengrass Lambda 函数的生命周期配置

    • arbitrary-function-id 替换为函数名称,如 stream-manager

    • alias-arn替换为您为 TransferStream Lambda 函数创建别名时复制的 AliasArn

     

    JSON expanded
    aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{ "Functions": [ { "Id": "arbitrary-function-id", "FunctionArn": "arn:aws-cn:lambda:::function:GGStreamManager:1", "FunctionConfiguration": { "MemorySize": 128000, "Pinned": true, "Timeout": 3 } }, { "Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": { "Executable": "transfer_stream.function_handler", "MemorySize": 16000, "Pinned": true, "Timeout": 5 } } ] }'
    JSON single
    aws greengrass create-function-definition \ --name MyGreengrassFunctions \ --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws-cn:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
    注意

    Timeout 是函数定义版本所需的,但 GGStreamManager 不使用它。

  2. 从输出中复制 LatestVersionArn。您将使用此值向部署到核心的组版本添加函数定义版本。

步骤 4:创建记录器定义和版本

配置组的日志记录设置。在本教程中,您将配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数以及连接器,以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息,请参阅 利用 AWS IoT Greengrass 日志进行监控

  1. 创建一个包含初始版本的记录器定义。

    JSON 扩展
    aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{ "Loggers": [ { "Id": "1", "Component": "GreengrassSystem", "Level": "INFO", "Space": 10240, "Type": "FileSystem" }, { "Id": "2", "Component": "Lambda", "Level": "INFO", "Space": 10240, "Type": "FileSystem" } ] }'
    JSON Single-line
    aws greengrass create-logger-definition \ --name "LoggingConfigs" \ --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
  2. 从输出中复制记录器定义的 LatestVersionArn。您将使用此值向部署到核心的组版本添加记录器定义版本。

步骤 5:获取核心定义版本的 ARN

获取要添加到新组版本的核心定义版本的 ARN。要部署组版本,该组版本必须引用包含确切一个核心的核心定义版本。

  1. 获取目标 Greengrass 组和组版本的 ID。在此过程中,我们假定这是最新的组和组版本。以下命令将返回最近创建的组。

    aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"

    或者,您也可以按名称查询。系统不要求组名称是唯一的,所以可能会返回多个组。

    aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
    注意

    您还可以在 AWS IoT 控制台 中找到这些值。组 ID 显示在组的设置页面上。组版本 ID 显示在组的部署页面上。

  2. 从输出中复制目标组的 Id。您将使用此值获取核心定义版本及部署组的时间。

  3. 从输出中复制 LatestVersion,这是添加到组的最后一个版本的 ID。您将使用此值获取核心定义版本。

  4. 获取核心定义版本的 ARN:

    1. 获取组版本。

      • 使用为组复制的 Id 替换 group-id

      • 使用为组复制的 LatestVersion 替换 group-version-id

      aws greengrass get-group-version \ --group-id group-id \ --group-version-id group-version-id
    2. 从输出中复制 CoreDefinitionVersionArn。您将使用此值向部署到核心的组版本添加核心定义版本。

步骤 6:创建组版本

现在,您已准备就绪,可以创建一个包含您要部署的实体的组版本。要实现此目的,需要创建一个引用每种组件类型的目标版本的组版本。在本教程中,您将包括核心定义版本、函数定义版本和记录器定义版本。

  1. 创建组版本。

    • 使用为组复制的 Id 替换 group-id

    • 使用为核心定义版本复制的 CoreDefinitionVersionArn 替换 core-definition-version-arn

    • function-definition-version-arn 替换为您为新函数定义版本复制的 LatestVersionArn

    • logger-definition-version-arn 替换为您为新的记录器定义版本复制的 LatestVersionArn

    aws greengrass create-group-version \ --group-id group-id \ --core-definition-version-arn core-definition-version-arn \ --function-definition-version-arn function-definition-version-arn \ --logger-definition-version-arn logger-definition-version-arn
  2. 从输出中复制 Version。这是新组版本的 ID。

步骤 7:创建部署

将组部署到核心设备。

  1. 确保 AWS IoT Greengrass Core正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

    1. 要检查守护程序是否正在运行,请执行以下操作:

      ps aux | grep -E 'greengrass.*daemon'

      如果输出包含 root/greengrass/ggc/packages/ggc-version/bin/daemon 条目,则表示守护程序正在运行。

      注意

      路径中的版本取决于您的核心设备上安装的 AWS IoT Greengrass Core 软件版本。

    2. 要启动守护程序,请执行以下操作:

      cd /greengrass/ggc/core/ sudo ./greengrassd start
  2. 创建部署。

    • 使用为组复制的 Id 替换 group-id

    • 使用为新组版本复制的 Version 替换 group-version-id

    aws greengrass create-deployment \ --deployment-type NewDeployment \ --group-id group-id \ --group-version-id group-version-id
  3. 从输出中复制 DeploymentId

  4. 获取部署状态。

    • 使用为组复制的 Id 替换 group-id

    • 使用为部署复制的 DeploymentId 替换 deployment-id

    aws greengrass get-deployment-status \ --group-id group-id \ --deployment-id deployment-id

    如果状态为 Success,则部署成功。有关问题排查帮助,请参阅AWS IoT Greengrass 问题排查

步骤 8:测试应用程序

TransferStream Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

  1. 在 Amazon Kinesis 控制台的 Kinesis data streams (Kinesis 数据流) 下,选择 MyKinesisStream

    注意

    如果您在运行教程时没有目标 Kinesis 数据流, 请检查流管理器的日志文件 (GGStreamManager)。如果它在错误消息中包含 export stream MyKinesisStream doesn't exist,则测试成功。此错误意味着服务试图导出到流,但流不存在。

  2. MyKinesisStream 页面上,选择 Monitoring (监控)。如果测试成功,您应在 Put Records (放置记录) 图表中看到数据。根据您的连接,显示数据可能需要一分钟时间。

    重要

    测试完成后,删除 Kinesis 数据流以避免产生更多费用。

    运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息,直到您准备好继续测试。

    cd /greengrass/ggc/core/ sudo ./greengrassd stop
  3. 从核心中删除 TransferStream Lambda 函数。

    1. 按照步骤 6:创建组版本创建新的组版本,但删除 create-group-version 命令中的 --function-definition-version-arn 选项。或者,创建不包括 TransferStream Lambda 函数的函数定义版本。

      注意

      通过省略部署的组版本中的系统 GGStreamManager Lambda 函数,可以禁用核心上的流管理。

    2. 按照步骤 7:创建部署以部署新的组版本。

要查看日志记录信息或解决流的问题,请检查日志中的 TransferStreamGGStreamManager 函数。您必须具有 root 权限以读取文件系统上的 AWS IoT Greengrass 日志。

  • TransferStream 将日志条目写入 greengrass-root/ggc/var/log/user/区域/account-id/TransferStream.log

  • GGStreamManager 将日志条目写入 greengrass-root/ggc/var/log/system/GGStreamManager.log

如果需要更多疑难解答信息,可以将 Lambda 日志记录级别设置为 DEBUG,然后创建并部署新的组版本。

另请参阅