创建使用流管理器的自定义组件 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建使用流管理器的自定义组件

在自定义 Greengrass 组件中使用流管理器来存储、处理和导出 IoT 设备数据。使用本节中的过程和示例创建与流管理器配合使用的组件配方、工件和应用程序。有关如何开发和测试组件的详细信息,请参阅CreateAmazon IoT Greengrass组件.

定义使用直播管理器的组件配方

要在自定义组件中使用流管理器,必须定义aws.greengrass.StreamManager组件作为依赖项。您还必须提供 Stream Manager SDK。完成以下任务,以您选择的语言下载并使用 Stream Manager SDK。

适用于 Java 的 Stream Manager SDK 作为 JAR 文件提供,您可以使用该文件来编译组件。然后,您可以创建包含 Stream Manager SDK 的应用程序 JAR,将应用程序 JAR 定义为组件工件,然后在组件生命周期中运行应用程序 JAR。

使用适用于 Java 的流管理器 SDK

  1. 下载适用于 Java JAR 文件的流管理器 SDK.

  2. 执行以下操作之一,从 Java 应用程序和 Stream Manager SDK JAR 文件创建组件工件:

    • 将应用程序构建为包含 Stream Manager SDK JAR 的 JAR 文件,然后在组件配方中运行此 JAR 文件。

    • 将 Stream Manager SDK JAR 定义为组件工件。在组件配方中运行应用程序时,将该工件添加到类路径中。

    您的组件配方可能与以下示例类似。此组件运行的修改版本StreamManagerS3.java例如,在哪里StreamManagerS3.jar包括流管理器 SDK JAR。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Java", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/StreamManagerS3.jar" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Java ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Lifecycle: Run: java -jar {artifacts:path}/StreamManagerS3.jar Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar

    有关如何开发和测试组件的详细信息,请参阅CreateAmazon IoT Greengrass组件.

适用于 Python 的 Stream Manager SDK 可作为源代码提供,您可以包含在组件中。创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件工件,然后在组件生命周期中安装 SDK 的要求。

使用适用于 Python 的流管理器 SDK

  1. 克隆或下载aws-绿草流管理器-sdk-python存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
  2. 创建一个包含stream_manager文件夹,其中包含适用于 Python 的 Stream Manager SDK 的源代码。您可以将此 ZIP 文件作为组件工件提供,Amazon IoT Greengrass核心软件在安装组件时解压。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-python
    2. 邮政编stream_manager文件夹到名为的 ZIP 文件中stream_manager_sdk.zip.

      Linux or Unix
      zip -rv stream_manager_sdk.zip stream_manager
      Windows Command Prompt (CMD)
      tar -acvf stream_manager_sdk.zip stream_manager
      PowerShell
      Compress-Archive stream_manager stream_manager_sdk.zip
    3. 确认stream_manager_sdk.zip此文件包含stream_managerfolder 及其内容。运行以下命令以列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream_manager_sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream_manager_sdk.zip

      该输出值应该类似于以下内容。

      Archive: aws-greengrass-stream-manager-sdk-python/stream_manager.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 20:45 stream_manager/ 913 02-24-2021 20:45 stream_manager/__init__.py 9719 02-24-2021 20:45 stream_manager/utilinternal.py 1412 02-24-2021 20:45 stream_manager/exceptions.py 1004 02-24-2021 20:45 stream_manager/util.py 0 02-24-2021 20:45 stream_manager/data/ 254463 02-24-2021 20:45 stream_manager/data/__init__.py 26515 02-24-2021 20:45 stream_manager/streammanagerclient.py --------- ------- 294026 8 files
  3. 将 Stream Manager SDK 工件复制到组件的工件文件夹中。除了 Stream Manager SDK ZIP 文件之外,您的组件还使用 SDK 的requirements.txt文件来安装 Stream Manager SDK 的依赖关系。Replace~/ 绿草组件使用您用于本地开发的文件夹的路径。

    Linux or Unix
    cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
    PowerShell
    cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0\
  4. 创建你的组件配方。在配方中,请执行以下操作:

    1. 定义stream_manager_sdk.ziprequirements.txt作为构件。

    2. 将你的 Python 应用程序定义为工件。

    3. 在安装生命周期中,从中安装 Stream Manager SDK 要求requirements.txt.

    4. 在运行生命周期中,将 Stream Manager SDK 附加到PYTHONPATH,然后运行你的 Python 应用程序。

    您的组件配方可能与以下示例类似。此组件运行stream_manager_s3.py示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Python", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "pip3 install --user -r {artifacts:path}/requirements.txt", "Run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "Install": "pip3 install --user -r {artifacts:path}/requirements.txt", "Run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Python ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: pip3 install --user -r {artifacts:path}/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt - Platform: os: windows Lifecycle: Install: pip3 install --user -r {artifacts:path}/requirements.txt Run: | set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk" py -3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt

    有关如何开发和测试组件的详细信息,请参阅CreateAmazon IoT Greengrass组件.

适用于的直播管理器 SDK JavaScript 可以包括在组件中的源代码。创建 Stream Manager SDK 的 ZIP 文件,将 ZIP 文件定义为组件工件,然后在组件生命周期中安装 SDK。

使用适用于 JavaScript 的直播管理器 SDK

  1. 克隆或下载aws-Greengrass 流管理器-sdk-js存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
  2. 创建一个包含aws-greengrass-stream-manager-sdk文件夹,其中包含适用于 JavaScript 的流管理器 SDK 的源代码。您可以将此 ZIP 文件作为组件工件提供,Amazon IoT Greengrass核心软件在安装组件时解压。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-js
    2. 邮政编aws-greengrass-stream-manager-sdk文件夹到名为的 ZIP 文件中stream-manager-sdk.zip.

      Linux or Unix
      zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      Windows Command Prompt (CMD)
      tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      PowerShell
      Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
    3. 确认stream-manager-sdk.zip此文件包含aws-greengrass-stream-manager-sdkfolder 及其内容。运行以下命令以列出 ZIP 文件的内容。

      Linux or Unix
      unzip -l stream-manager-sdk.zip
      Windows Command Prompt (CMD)
      tar -tf stream-manager-sdk.zip

      该输出值应该类似于以下内容。

      Archive: stream-manager-sdk.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/ 369 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/package.json 1017 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/util.js 8374 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/utilInternal.js 1937 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/exceptions.js 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/ 353343 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/index.js 22599 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/client.js 216 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/index.js --------- ------- 387855 9 files
  3. 将 Stream Manager SDK 工件复制到组件的工件文件夹中。Replace~/ 绿草组件使用您用于本地开发的文件夹的路径。

    Linux or Unix
    cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
    Windows Command Prompt (CMD)
    robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
    PowerShell
    cp .\stream-manager-sdk.zip ~\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0\
  4. 创建你的组件配方。在配方中,请执行以下操作:

    1. 定义stream-manager-sdk.zip作为神器。

    2. 定义您的 JavaScript 应用程序作为工件。

    3. 在安装生命周期中,从stream-manager-sdk.zip构件。该npm install创建命令node_modules包含流管理器开发工具包及其依赖项的文件夹。

    4. 在运行生命周期中,追加node_modules到的文件夹NODE_PATH,然后运行您的 JavaScript 应用程序.

    您的组件配方可能与以下示例类似。此组件运行直播管理器 3示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3JS", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "Run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] }, { "Platform": { "os": "windows" }, "Lifecycle": { "Install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "Run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3JS ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk Run: | export NODE_PATH=$NODE_PATH:{work:path}/node_modules node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js - Platform: os: windows Lifecycle: Install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk Run: | set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules" node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js

    有关如何开发和测试组件的详细信息,请参阅CreateAmazon IoT Greengrass组件.

在应用代码中 Connect 到直播管理器

要连接到应用程序中的流管理器,请创建一个实例StreamManagerClient来自直播管理器 SDK。此客户端连接到默认端口 8088 或您指定的端口上的流管理器组件。有关如何使用的详细信息StreamManagerClient创建实例后,请参阅使用 StreamManagerClient 处理流.

例如:使用默认端口 Connect 到流管理器

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; public class MyStreamManagerComponent { void connectToStreamManagerWithDefaultPort() { StreamManagerClient client = StreamManagerClientFactory.standard().build(); // Use the client. } }
Python
from stream_manager import ( StreamManagerClient ) def connect_to_stream_manager_with_default_port(): client = StreamManagerClient() # Use the client.
JavaScript
const { StreamManagerClient } = require('aws-greengrass-stream-manager-sdk'); function connectToStreamManagerWithDefaultPort() { const client = new StreamManagerClient(); // Use the client. }

例如:使用非默认端口 Connect 到直播管理器

如果您使用默认端口配置流管理器,则必须使用进程间通信从组件配置中检索端口。

注意

这些区域有:port配置参数包含您在中指定的值STREAM_MANAGER_SERVER_PORT当您部署流管理器时。

Java
void connectToStreamManagerWithCustomPort() { EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection(); GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection); List<String> keyPath = new ArrayList<>(); keyPath.add("port"); GetConfigurationRequest request = new GetConfigurationRequest(); request.setComponentName("aws.greengrass.StreamManager"); request.setKeyPath(keyPath); GetConfigurationResponse response = greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get(); String port = response.getValue().get("port").toString(); System.out.print("Stream Manager is running on port: " + port); final StreamManagerClientConfig config = StreamManagerClientConfig.builder() .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build(); StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build(); // Use the client. }
Python
import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( GetConfigurationRequest ) from stream_manager import ( StreamManagerClient ) TIMEOUT = 10 def connect_to_stream_manager_with_custom_port(): # Use IPC to get the port from the stream manager component configuration. ipc_client = awsiot.greengrasscoreipc.connect() request = GetConfigurationRequest() request.component_name = "aws.greengrass.StreamManager" request.key_path = ["port"] operation = ipc_client.new_get_configuration() operation.activate(request) futureResponse = operation.get_response() response = futureResponse.result(TIMEOUT) stream_manager_port = str(response.value["port"]) # Use port to create a stream manager client. stream_client = StreamManagerClient(port=stream_manager_port) # Use the client.