创建使用流管理器的自定义组件 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建使用流管理器的自定义组件

在自定义 Greengrass 组件中使用流管理器来存储、处理和导出 IoT 设备数据。使用本节中的过程和示例创建可与 Stream Manager 配合使用的组件配方、构件和应用程序。有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

定义使用流管理器的组件配方

要在自定义组件中使用流管理器,必须将该aws.greengrass.StreamManager组件定义为依赖关系。您还必须提供直播管理器 SDK。完成以下任务,下载和使用您选择的语言的 Stream Manager SDK。

适用于 Java 的 Stream Manager SDK 以 JAR 文件形式提供,您可以使用它来编译组件。然后,您可以创建包含 Stream Manager SDK 的应用程序 JAR,将应用程序 JAR 定义为组件构件,并在组件生命周期中运行应用程序 JAR。

使用适用于 Java 的直播管理器 SDK
  1. 下载适用于 Java 的直播管理器 SDK JAR 文件

  2. 执行以下操作之一,从 Java 应用程序和 Stream Manager SDK JAR 文件中创建组件构件:

    • 将您的应用程序构建为包含 Stream Manager SDK JAR 的 JAR 文件,然后在组件配方中运行此 JAR 文件。

    • 将流管理器 SDK JAR 定义为组件构件。在组件配方中运行应用程序时,将该工件添加到类路径中。

    您的组件配方可能类似于以下示例。此组件运行 StreamManagerS3.java 示例的修改版本,其中StreamManagerS3.jar包括 Stream Manager SDK JAR。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Java", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/StreamManagerS3.jar" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Java ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Lifecycle: run: java -jar {artifacts:path}/StreamManagerS3.jar Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

适用于 Python 的 Stream Manager SDK 可作为源代码提供,您可以将其包含在组件中。创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件构件,然后在组件生命周期中安装 SDK 的要求。

使用适用于 Python 的直播管理器 SDK
  1. 克隆或下载 aws-greengrass-stream-manager-sdk- python 存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
  2. 创建一个包含该文件夹的 ZIP 文件,该stream_manager文件夹包含适用于 Python 的 Stream Manager SDK 的源代码。您可以将此 ZIP 文件作为组件构件提供,Amazon IoT GreengrassCore 软件会在安装您的组件时解压缩。执行以下操作:

    1. 打开包含您在上一个步骤中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-python
    2. 将该stream_manager文件夹压缩到名为的 ZIP 文件中stream_manager_sdk.zip

      Linux or Unix
      zip -rv stream_manager_sdk.zip stream_manager
      Windows Command Prompt (CMD)
      tar -acvf stream_manager_sdk.zip stream_manager
      PowerShell
      Compress-Archive stream_manager stream_manager_sdk.zip
    3. 验证stream_manager_sdk.zip文件是否包含该stream_manager文件夹及其内容。运行以下命令列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream_manager_sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream_manager_sdk.zip

      该输出值应该类似于以下内容。

      Archive: aws-greengrass-stream-manager-sdk-python/stream_manager.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 20:45 stream_manager/ 913 02-24-2021 20:45 stream_manager/__init__.py 9719 02-24-2021 20:45 stream_manager/utilinternal.py 1412 02-24-2021 20:45 stream_manager/exceptions.py 1004 02-24-2021 20:45 stream_manager/util.py 0 02-24-2021 20:45 stream_manager/data/ 254463 02-24-2021 20:45 stream_manager/data/__init__.py 26515 02-24-2021 20:45 stream_manager/streammanagerclient.py --------- ------- 294026 8 files
  3. 将 Stream Manager SDK 构件复制到组件的构件文件夹。除了 Stream Manager SDK 压缩文件外,您的组件还使用 SDK requirements.txt 的文件来安装 Stream Manager SDK 的依赖项。将 ~/greengrass-co mponents 替换为用于本地开发的文件夹的路径。

    Linux or Unix
    cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
    PowerShell
    cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0\
  4. 创建您的组件配方。在配方中,执行以下操作:

    1. stream_manager_sdk.zip和定义requirements.txt为工件。

    2. 将您的 Python 应用程序定义为工件。

    3. 在安装生命周期中,从中安装 Stream Manager SDK 要求requirements.txt

    4. 在运行生命周期中,将 Stream Manager SDK 附加到PYTHONPATH并运行您的 Python 应用程序。

    您的组件配方可能类似于以下示例。此组件运行 stream_manager_s3.py 示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Python", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "pip3 install --user -r {artifacts:path}/requirements.txt", "run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "pip3 install --user -r {artifacts:path}/requirements.txt", "run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Python ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: install: pip3 install --user -r {artifacts:path}/requirements.txt run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt - Platform: os: windows Lifecycle: install: pip3 install --user -r {artifacts:path}/requirements.txt run: | set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk" py -3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

的 Stream Manager SDK 可作为源代码提供,您可以将其包含在组件中。JavaScript创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件构件,然后在组件生命周期中安装 SDK。

使用 Stream Manager SDK JavaScript
  1. 克隆或下载 aws-greengrass-stream-manager-sdk-js 存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
  2. 创建一个包含该文件夹的 ZIP 文件,该aws-greengrass-stream-manager-sdk文件夹包含用于 Stream Manager SDK 的源代码JavaScript。您可以将此 ZIP 文件作为组件构件提供,Amazon IoT GreengrassCore 软件会在安装您的组件时解压缩。执行以下操作:

    1. 打开包含您在上一个步骤中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-js
    2. 将该aws-greengrass-stream-manager-sdk文件夹压缩到名为的 ZIP 文件中stream-manager-sdk.zip

      Linux or Unix
      zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      Windows Command Prompt (CMD)
      tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      PowerShell
      Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
    3. 验证stream-manager-sdk.zip文件是否包含该aws-greengrass-stream-manager-sdk文件夹及其内容。运行以下命令列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream-manager-sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream-manager-sdk.zip

      该输出值应该类似于以下内容。

      Archive: stream-manager-sdk.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/ 369 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/package.json 1017 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/util.js 8374 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/utilInternal.js 1937 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/exceptions.js 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/ 353343 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/index.js 22599 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/client.js 216 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/index.js --------- ------- 387855 9 files
  3. 将 Stream Manager SDK 构件复制到组件的构件文件夹。将 ~/greengrass-co mponents 替换为用于本地开发的文件夹的路径。

    Linux or Unix
    cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
    PowerShell
    cp .\stream-manager-sdk.zip ~\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0\
  4. 创建您的组件配方。在配方中,执行以下操作:

    1. 定义stream-manager-sdk.zip为神器。

    2. 将您的JavaScript应用程序定义为人工制品。

    3. 在安装生命周期中,从stream-manager-sdk.zip构件中安装 Stream Manager SDK。此npm install命令创建一个包含 Stream Manager SDK 及其依赖项node_modules的文件夹。

    4. 在运行生命周期中,将node_modules文件夹追加到NODE_PATH并运行您的JavaScript应用程序。

    您的组件配方可能类似于以下示例。此组件运行 S StreamManager3 示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3JS", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3JS ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk run: | export NODE_PATH=$NODE_PATH:{work:path}/node_modules node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js - Platform: os: windows Lifecycle: install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk run: | set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules" node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

在应用程序代码中Connect 流管理器

要在应用程序中连接到流管理器,请StreamManagerClient从 Stream Manager SDK 创建一个实例。此客户端通过其默认端口 8088 或您指定的端口连接到流管理器组件。有关如何在创建安装安装安装StreamManagerClient安装安装的更多信息,请参阅 StreamManagerClient 用于处理直播

例 示例:使用默认端口Connect 直播管理器
Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; public class MyStreamManagerComponent { void connectToStreamManagerWithDefaultPort() { StreamManagerClient client = StreamManagerClientFactory.standard().build(); // Use the client. } }
Python
from stream_manager import ( StreamManagerClient ) def connect_to_stream_manager_with_default_port(): client = StreamManagerClient() # Use the client.
JavaScript
const { StreamManagerClient } = require('aws-greengrass-stream-manager-sdk'); function connectToStreamManagerWithDefaultPort() { const client = new StreamManagerClient(); // Use the client. }
例 示例:使用非默认端口Connect 直播管理器

如果您为流管理器配置了非默认端口,则必须使用进程间通信从组件配置中检索端口。

注意

port配置参数包含您在部署流管理器STREAM_MANAGER_SERVER_PORT时指定的值。

Java
void connectToStreamManagerWithCustomPort() { EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection(); GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection); List<String> keyPath = new ArrayList<>(); keyPath.add("port"); GetConfigurationRequest request = new GetConfigurationRequest(); request.setComponentName("aws.greengrass.StreamManager"); request.setKeyPath(keyPath); GetConfigurationResponse response = greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get(); String port = response.getValue().get("port").toString(); System.out.print("Stream Manager is running on port: " + port); final StreamManagerClientConfig config = StreamManagerClientConfig.builder() .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build(); StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build(); // Use the client. }
Python
import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( GetConfigurationRequest ) from stream_manager import ( StreamManagerClient ) TIMEOUT = 10 def connect_to_stream_manager_with_custom_port(): # Use IPC to get the port from the stream manager component configuration. ipc_client = awsiot.greengrasscoreipc.connect() request = GetConfigurationRequest() request.component_name = "aws.greengrass.StreamManager" request.key_path = ["port"] operation = ipc_client.new_get_configuration() operation.activate(request) future_response = operation.get_response() response = future_response.result(TIMEOUT) stream_manager_port = str(response.value["port"]) # Use port to create a stream manager client. stream_client = StreamManagerClient(port=stream_manager_port) # Use the client.