创建使用流管理器的自定义组件 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建使用流管理器的自定义组件

在自定义 Greengrass 组件中使用流管理器来存储、处理和导出物联网设备数据。使用本节中的过程和示例来创建与流管理器配合使用的组件配方、工件和应用程序。有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

定义使用直播管理器的组件配方

要在自定义组件中使用流管理器,必须将该aws.greengrass.StreamManager组件定义为依赖项。您还必须提供直播管理器 SDK。完成以下任务,下载并使用您选择的语言版本的 Stream Manager SDK。

适用于 Java 的 Stream Manager SDK 以 JAR 文件形式提供,你可以用它来编译组件。然后,您可以创建包含 Stream Manager SDK 的应用程序 JAR,将应用程序 JAR 定义为组件构件,并在组件生命周期中运行应用程序 JAR。

使用适用于 Java 的直播管理器 SDK
  1. 下载适用于 Java 的直播管理器 SDK JAR 文件

  2. 执行以下任一操作,从 Java 应用程序和 Stream Manager SDK JAR 文件创建组件构件:

    • 将您的应用程序构建为包含 Stream Manager SDK JAR 的 JAR 文件,然后在组件配方中运行此 JAR 文件。

    • 将 Stream Manager SDK JAR 定义为组件构件。当你在组件配方中运行应用程序时,将该工件添加到类路径中。

    您的组件配方可能类似于以下示例。此组件运行 StreamManagerS3.java 示例的修改版本,其中StreamManagerS3.jar包括 Stream Manager SDK JAR。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Java", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/StreamManagerS3.jar" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Java ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Lifecycle: run: java -jar {artifacts:path}/StreamManagerS3.jar Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

适用于 Python 的流管理器 SDK 以源代码形式提供,您可以将其包含在组件中。创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件构件,然后在组件生命周期中安装 SDK 的要求。

使用适用于 Python 的直播管理器 SDK
  1. 克隆或下载 aws-greengrass-stream-manager-sdk- python 存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
  2. 创建包含该stream_manager文件夹的 ZIP 文件,其中包含适用于 Python 的流管理器 SDK 的源代码。您可以将此 ZIP 文件作为组件工件提供, Amazon IoT Greengrass Core 软件在安装您的组件时会将其解压缩。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-python
    2. 将该stream_manager文件夹压缩到名为的 ZIP 文件中stream_manager_sdk.zip

      Linux or Unix
      zip -rv stream_manager_sdk.zip stream_manager
      Windows Command Prompt (CMD)
      tar -acvf stream_manager_sdk.zip stream_manager
      PowerShell
      Compress-Archive stream_manager stream_manager_sdk.zip
    3. 验证该stream_manager_sdk.zip文件是否包含该stream_manager文件夹及其内容。运行以下命令列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream_manager_sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream_manager_sdk.zip

      输出应类似于以下内容。

      Archive: aws-greengrass-stream-manager-sdk-python/stream_manager.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 20:45 stream_manager/ 913 02-24-2021 20:45 stream_manager/__init__.py 9719 02-24-2021 20:45 stream_manager/utilinternal.py 1412 02-24-2021 20:45 stream_manager/exceptions.py 1004 02-24-2021 20:45 stream_manager/util.py 0 02-24-2021 20:45 stream_manager/data/ 254463 02-24-2021 20:45 stream_manager/data/__init__.py 26515 02-24-2021 20:45 stream_manager/streammanagerclient.py --------- ------- 294026 8 files
  3. 将 Stream Manager SDK 构件复制到组件的构件文件夹。除了流管理器 SDK ZIP 文件外,您的组件还使用该软件开发工具包requirements.txt的文件来安装流管理器 SDK 的依赖项。将 ~/greengrass-components 替换为用于本地开发的文件夹的路径。

    Linux or Unix
    cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
    PowerShell
    cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0\
  4. 创建您的组件配方。在食谱中,执行以下操作:

    1. stream_manager_sdk.zip和定义requirements.txt为工件。

    2. 将你的 Python 应用程序定义为工件。

    3. 在安装生命周期中,安装来自的流管理器 SDK 要求requirements.txt

    4. 在运行生命周期中,将流管理器 SDK 附加到PYTHONPATH并运行您的 Python 应用程序。

    您的组件配方可能类似于以下示例。此组件运行 stream_manager_s3.py 示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Python", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "pip3 install --user -r {artifacts:path}/requirements.txt", "run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "pip3 install --user -r {artifacts:path}/requirements.txt", "run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Python ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: install: pip3 install --user -r {artifacts:path}/requirements.txt run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt - Platform: os: windows Lifecycle: install: pip3 install --user -r {artifacts:path}/requirements.txt run: | set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk" py -3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

的 Stream Manager SDK 以 JavaScript 源代码形式提供,您可以将其包含在组件中。创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件构件,然后在组件生命周期中安装 SDK。

要将直播管理器 SDK 用于 JavaScript
  1. 克隆或下载 aws-greengrass-stream-manager-sdk-js 存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
  2. 创建包含该aws-greengrass-stream-manager-sdk文件夹的 ZIP 文件,其中包含流管理器 SDK 的源代码 JavaScript。您可以将此 ZIP 文件作为组件工件提供, Amazon IoT Greengrass Core 软件在安装您的组件时会将其解压缩。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-js
    2. 将该aws-greengrass-stream-manager-sdk文件夹压缩到名为的 ZIP 文件中stream-manager-sdk.zip

      Linux or Unix
      zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      Windows Command Prompt (CMD)
      tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      PowerShell
      Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
    3. 验证该stream-manager-sdk.zip文件是否包含该aws-greengrass-stream-manager-sdk文件夹及其内容。运行以下命令列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream-manager-sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream-manager-sdk.zip

      输出应类似于以下内容。

      Archive: stream-manager-sdk.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/ 369 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/package.json 1017 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/util.js 8374 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/utilInternal.js 1937 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/exceptions.js 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/ 353343 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/index.js 22599 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/client.js 216 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/index.js --------- ------- 387855 9 files
  3. 将 Stream Manager SDK 构件复制到组件的构件文件夹。将 ~/greengrass-components 替换为用于本地开发的文件夹的路径。

    Linux or Unix
    cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
    PowerShell
    cp .\stream-manager-sdk.zip ~\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0\
  4. 创建您的组件配方。在食谱中,执行以下操作:

    1. 定义stream-manager-sdk.zip为工件。

    2. 将您的 JavaScript 应用程序定义为工件。

    3. 在安装生命周期中,从stream-manager-sdk.zip构件安装流管理器 SDK。此npm install命令创建一个包含流管理器 SDK 及其依赖项node_modules的文件夹。

    4. 在运行生命周期中,将node_modules文件夹追加到NODE_PATH并运行您的 JavaScript 应用程序。

    您的组件配方可能类似于以下示例。此组件运行 S StreamManager3 示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3JS", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3JS ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk run: | export NODE_PATH=$NODE_PATH:{work:path}/node_modules node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js - Platform: os: windows Lifecycle: install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk run: | set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules" node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js

    有关如何开发和测试组件的更多信息,请参阅创建 Amazon IoT Greengrass 组件

在应用程序代码中连接直播管理器

要在应用程序中连接到直播管理器,请使用流管理器 SDK 创建StreamManagerClient的实例。此客户端通过其默认端口 8088 或您指定的端口连接到流管理器组件。有关创建实例StreamManagerClient后如何使用的更多信息,请参阅 StreamManagerClient 用于处理直播

例 示例:使用默认端口连接直播管理器
Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; public class MyStreamManagerComponent { void connectToStreamManagerWithDefaultPort() { StreamManagerClient client = StreamManagerClientFactory.standard().build(); // Use the client. } }
Python
from stream_manager import ( StreamManagerClient ) def connect_to_stream_manager_with_default_port(): client = StreamManagerClient() # Use the client.
JavaScript
const { StreamManagerClient } = require('aws-greengrass-stream-manager-sdk'); function connectToStreamManagerWithDefaultPort() { const client = new StreamManagerClient(); // Use the client. }
例 示例:使用非默认端口连接直播管理器

如果使用默认端口以外的端口配置流管理器,则必须使用进程间通信从组件配置中检索该端口。

注意

port配置参数包含您在部署流管理器STREAM_MANAGER_SERVER_PORT时在中指定的值。

Java
void connectToStreamManagerWithCustomPort() { EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection(); GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection); List<String> keyPath = new ArrayList<>(); keyPath.add("port"); GetConfigurationRequest request = new GetConfigurationRequest(); request.setComponentName("aws.greengrass.StreamManager"); request.setKeyPath(keyPath); GetConfigurationResponse response = greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get(); String port = response.getValue().get("port").toString(); System.out.print("Stream Manager is running on port: " + port); final StreamManagerClientConfig config = StreamManagerClientConfig.builder() .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build(); StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build(); // Use the client. }
Python
import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( GetConfigurationRequest ) from stream_manager import ( StreamManagerClient ) TIMEOUT = 10 def connect_to_stream_manager_with_custom_port(): # Use IPC to get the port from the stream manager component configuration. ipc_client = awsiot.greengrasscoreipc.connect() request = GetConfigurationRequest() request.component_name = "aws.greengrass.StreamManager" request.key_path = ["port"] operation = ipc_client.new_get_configuration() operation.activate(request) future_response = operation.get_response() response = future_response.result(TIMEOUT) stream_manager_port = str(response.value["port"]) # Use port to create a stream manager client. stream_client = StreamManagerClient(port=stream_manager_port) # Use the client.