创建使用流管理器的自定义组件 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

创建使用流管理器的自定义组件

使用自定义 Greengrass 组件中的流管理器存储、处理和导出 IoT 设备数据。使用本节中的过程和示例创建与流管理器配合使用的组件配方、工件和应用程序。有关如何开发和测试组件的详细信息,请参阅创建自定义Amazon IoT Greengrass组件.

定义使用流管理器的组件配方

要在自定义组件中使用流管理器,必须定义aws.greengrass.StreamManager组件作为依赖关系。还必须提供流管理器 SDK。完成以下任务,以您选择的语言下载并使用流管理器 SDK。

适用于 Java 的流管理器软件开发工具包作为 JAR 文件提供,您可以使用该文件编译组件。然后,您可以创建包含流管理器 SDK 的应用程序 JAR,将应用程序 JAR 定义为组件对象,并在组件生命周期中运行应用程序 JAR。

使用适用于 Java 的流管理器软件开发工具包

  1. 下载适用于 Java JAR 文件的流管理器开发工具包.

  2. 执行以下操作之一,从 Java 应用程序和流管理器 SDK JAR 文件创建组件工件:

    • 将应用程序构建为包含流管理器 SDK JAR 的 JAR 文件,然后在组件配方中运行此 JAR 文件。

    • 将流管理器 SDK JAR 定义为组件对象。在组件配方中运行应用程序时,将该工件添加到类路径中。

    您的组件配方可能与以下示例类似。此组件运行StreamManagerS3.java示例,其中StreamManagerS3.jar包含流管理器软件开发工具包 JAR。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Java", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/StreamManagerS3.jar" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Java ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Lifecycle: Run: java -jar {artifacts:path}/StreamManagerS3.jar Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar

    有关如何开发和测试组件的详细信息,请参阅创建自定义Amazon IoT Greengrass组件.

适用于 Python 的流管理器 SDK 可作为源代码提供,您可以包含在组件中。创建流管理器 SDK 的 ZIP 文件,将 ZIP 文件定义为组件对象,并在组件生命周期中安装 SDK 的要求。

使用适用于 Python 的流管理器软件开发工具包

  1. 克隆或下载Amazon 绿色草流管理器-软件开发-Python存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
  2. 创建一个包含stream_manager文件夹,其中包含适用于 Python 的流管理器开发工具包的源代码。您可以将此 ZIP 文件作为组件工件提供,Amazon IoT Greengrass核心软件在安装组件时解压缩。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-python
    2. 邮政编码stream_manager文件夹添加到名为stream_manager_sdk.zip.

      zip -r stream_manager_sdk.zip stream_manager
    3. 验证stream_manager_sdk.zip文件包含stream_manager文件夹以及其中的内容。运行以下命令,列出 ZIP 文件的内容。

      unzip -l stream_manager_sdk.zip

      该输出值应该类似于以下内容。

      Archive: aws-greengrass-stream-manager-sdk-python/stream_manager.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 20:45 stream_manager/ 913 02-24-2021 20:45 stream_manager/__init__.py 9719 02-24-2021 20:45 stream_manager/utilinternal.py 1412 02-24-2021 20:45 stream_manager/exceptions.py 1004 02-24-2021 20:45 stream_manager/util.py 0 02-24-2021 20:45 stream_manager/data/ 254463 02-24-2021 20:45 stream_manager/data/__init__.py 26515 02-24-2021 20:45 stream_manager/streammanagerclient.py --------- ------- 294026 8 files
  3. 将流管理器 SDK 对象复制到组件的工件文件夹中。除了流管理器 SDK ZIP 文件外,您的组件还使用 SDK 的requirements.txt文件来安装流管理器 SDK 的依赖关系。Replace〜/绿草组件使用您用于本地开发的文件夹的路径。

    cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
  4. 创建您的组件配方。在配方中,执行以下操作:

    1. DEFIDEDstream_manager_sdk.ziprequirements.txt作为构件。

    2. 将 Python 应用程序定义为工件。

    3. 在安装生命周期中,从requirements.txt.

    4. 在运行生命周期中,将流管理器 SDK 附加到PYTHONPATH,然后运行您的 Python 应用程序。

    您的组件配方可能与以下示例类似。此组件运行stream_manager_s3.py示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3Python", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "pip3 install --user -r {artifacts:path}/requirements.txt", "Run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3Python ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: pip3 install --user -r {artifacts:path}/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 {artifacts:path}/stream_manager_s3.py Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt

    有关如何开发和测试组件的详细信息,请参阅创建自定义Amazon IoT Greengrass组件.

适用于 JavaScript 的流管理器软件开发工具包作为源代码提供,您可以将其包含在组件中。创建流管理器 SDK 的 ZIP 文件,将 ZIP 文件定义为组件对象,然后在组件生命周期中安装 SDK。

使用流管理器软件开发工具包

  1. 克隆或下载Amazon 绿色草流管理器 SDk-js存储库。

    git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
  2. 创建一个包含aws-greengrass-stream-manager-sdk文件夹,其中包含适用于 JavaScript 的流管理器软件开发工具包的源代码。您可以将此 ZIP 文件作为组件工件提供,Amazon IoT Greengrass核心软件在安装组件时解压缩。执行以下操作:

    1. 打开包含您在上一步中克隆或下载的存储库的文件夹。

      cd aws-greengrass-stream-manager-sdk-js
    2. 邮政编码aws-greengrass-stream-manager-sdk文件夹添加到名为stream-manager-sdk.zip.

      zip -r stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
    3. 验证stream-manager-sdk.zip文件包含aws-greengrass-stream-manager-sdk文件夹以及其中的内容。运行以下命令,列出 ZIP 文件的内容。

      unzip -l stream-manager-sdk.zip

      该输出值应该类似于以下内容。

      Archive: stream-manager-sdk.zip Length Date Time Name --------- ---------- ----- ---- 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/ 369 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/package.json 1017 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/util.js 8374 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/utilInternal.js 1937 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/exceptions.js 0 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/ 353343 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/data/index.js 22599 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/client.js 216 02-24-2021 22:36 aws-greengrass-stream-manager-sdk/index.js --------- ------- 387855 9 files
  3. 将流管理器 SDK 对象复制到组件的工件文件夹中。Replace〜/绿草组件使用您用于本地开发的文件夹的路径。

    cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
  4. 创建您的组件配方。在配方中,执行以下操作:

    1. DEFIDEDstream-manager-sdk.zip作为一个神器。

    2. 将您的 JavaScript 应用程序定义为工件。

    3. 在安装生命周期中,从stream-manager-sdk.zip。该npm install命令可以创建node_modules文件夹,包含流管理器开发工具包及其依赖项。

    4. 在运行生命周期中,将node_modules文件夹NODE_PATH,然后运行您的 JavaScript 应用程序。

    您的组件配方可能与以下示例类似。此组件运行流管理器 S3示例。

    JSON
    { "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.StreamManagerS3JS", "ComponentVersion": "1.0.0", "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.", "ComponentPublisher": "Amazon", "ComponentDependencies": { "aws.greengrass.StreamManager": { "VersionRequirement": "^2.0.0" } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "Install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk", "Run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip", "Unarchive": "ZIP" }, { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js" } ] } ] }
    YAML
    --- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.StreamManagerS3JS ComponentVersion: 1.0.0 ComponentDescription: Uses stream manager to upload a file to an S3 bucket. ComponentPublisher: Amazon ComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: "^2.0.0" Manifests: - Platform: os: linux Lifecycle: Install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk Run: | export NODE_PATH=$NODE_PATH:{work:path}/node_modules node {artifacts:path}/index.js Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip Unarchive: ZIP - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js

    有关如何开发和测试组件的详细信息,请参阅创建自定义Amazon IoT Greengrass组件.

在应用程序代码中 Connect 到流管理器

要连接到应用程序中的流管理器,请创建StreamManagerClient从流管理器软件开发工具包中获取。此客户端连接到流管理器组件的默认端口 8088 或您指定的端口。有关如何使用StreamManagerClient创建实例后,请参阅使用 StreamManagerClient 处理流.

例如:使用默认端口 Connect 到流管理器

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; public class MyStreamManagerComponent { void connectToStreamManagerWithDefaultPort() { StreamManagerClient client = StreamManagerClientFactory.standard().build(); // Use the client. } }
Python
from stream_manager import ( StreamManagerClient ) def connect_to_stream_manager_with_default_port(): client = StreamManagerClient() # Use the client.
JavaScript
const { StreamManagerClient } = require('aws-greengrass-stream-manager-sdk'); function connectToStreamManagerWithDefaultPort() { const client = new StreamManagerClient(); // Use the client. }

例如:使用非默认端口 Connect 到流管理器

如果您使用默认端口以外的端口配置流管理器,您必须使用进程间通信从组件配置中检索端口。

注意

这些区域有:port配置参数包含您在STREAM_MANAGER_SERVER_PORT当您部署流管理器时。

Java
void connectToStreamManagerWithCustomPort() { EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection(); GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection); List<String> keyPath = new ArrayList<>(); keyPath.add("port"); GetConfigurationRequest request = new GetConfigurationRequest(); request.setComponentName("aws.greengrass.StreamManager"); request.setKeyPath(keyPath); GetConfigurationResponse response = greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get(); String port = response.getValue().get("port").toString(); System.out.print("Stream Manager is running on port: " + port); final StreamManagerClientConfig config = StreamManagerClientConfig.builder() .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build(); StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build(); // Use the client. }
Python
import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( GetConfigurationRequest ) from stream_manager import ( StreamManagerClient ) TIMEOUT = 10 def connect_to_stream_manager_with_custom_port(): # Use IPC to get the port from the stream manager component configuration. ipc_client = awsiot.greengrasscoreipc.connect() request = GetConfigurationRequest() request.component_name = "aws.greengrass.StreamManager" request.key_path = ["port"] operation = ipc_client.new_get_configuration() operation.activate(request) futureResponse = operation.get_response() response = futureResponse.result(TIMEOUT) stream_manager_port = str(response.value["port"]) # Use port to create a stream manager client. stream_client = StreamManagerClient(port=stream_manager_port) # Use the client.