使用Amazon IoT Device SDK与 Greengrass 原子核、其他组件进行通信 Amazon IoT Core - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用Amazon IoT Device SDK与 Greengrass 原子核、其他组件进行通信 Amazon IoT Core

在核心设备上运行的组件可以使用中的Amazon IoT Greengrass核心进程间通信 (IPC) 库与Amazon IoT Greengrass核心和其他 Gre Amazon IoT Device SDK engrass 组件进行通信。要开发和运行使用 IPC 的自定义组件,必须使用连接Amazon IoT Device SDK到 C Amazon IoT Greengrass ore IPC 服务并执行 IPC 操作。

IPC 接口支持两种类型的操作:

  • 请求/响应

    组件向 IPC 服务发送请求并收到包含请求结果的响应。

  • 订阅

    组件向 IPC 服务发送订阅请求,并期望事件消息流作为响应。组件提供了一个订阅处理程序,用于处理事件消息、错误和流关闭。Amazon IoT Device SDK包括一个处理程序接口,其中包含每个 IPC 操作的正确响应和事件类型。有关更多信息,请参阅 订阅 IPC 事件直播

IPC 客户端版本

在更高版本的 Java 和 Python 软件开发工具包中,Amazon IoT Greengrass提供了 IPC 客户端的改进版本,名为 IPC 客户端 V2。IPC 客户端 V2:

  • 减少使用 IPC 操作所需编写的代码量,并有助于避免 IPC 客户端 V1 可能出现的常见错误。

  • 在单独的线程中调用订阅处理程序回调,因此您现在可以在订阅处理程序回调中运行阻塞代码,包括其他 IPC 函数调用。IPC 客户端 V1 使用相同的线程与 IPC 服务器通信并调用订阅处理程序回调。

  • 允许您使用 Lambda 表达式 (Java) 或函数 (Python) 调用订阅操作。IPC 客户端 V1 要求您定义订阅处理程序类。

  • 提供每个 IPC 操作的同步和异步版本。IPC 客户端 V1 仅提供每个操作的异步版本。

我们建议您使用 IPC 客户端 V2 来利用这些改进。但是,本文档和一些在线内容中的许多示例仅演示如何使用 IPC 客户端 V1。您可以使用以下示例和教程来查看使用 IPC 客户端 V2 的示例组件:

目前,Amazon IoT Device SDK适用于 C++ 的 v2 仅支持 IPC 客户端 V1。

支持用于进程间通信的 SDK

C Amazon IoT Greengrass ore IPC 库包含在以下Amazon IoT Device SDK版本中。

Connect 到 C Amazon IoT Greengrass ore IPC 服务

要在自定义组件中使用进程间通信,必须创建与 C Amazon IoT Greengrass ore 软件运行的 IPC 服务器套接字的连接。完成以下任务,下载并使用您选择Amazon IoT Device SDK的语言。

要使用Amazon IoT Device SDK适用于 Java v2(IPC 客户端 V2)
  1. 下载Amazon IoT Device SDK适用于 Java 版本 2(1.6.0 或更高版本)的。

  2. 执行以下任一操作以在组件中运行您的自定义代码:

    • 将您的组件构建为包含的 JAR 文件Amazon IoT Device SDK,然后在组件配方中运行此 JAR 文件。

    • 将 Amazon IoT Device SDK JAR 定义为组件工件,并在组件配方中运行应用程序时将该构件添加到类路径中。

  3. 使用以下代码创建 IPC 客户端。

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
要使用Amazon IoT Device SDK适用于 Python v2(IPC 客户端 V2)
  1. 下载Amazon IoT Device SDK适用于 Python 的(1.9.0 或更高版本)。

  2. 将 SDK 的安装步骤添加到组件配方中的安装生命周期中。

  3. 创建与Amazon IoT Greengrass核心 IPC 服务的连接。使用以下代码创建 IPC 客户端。

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

要构建 C++ 版 Amazon IoT Device SDK v2,设备必须具有以下工具:

  • C++ 11 或更高版本

  • cMake 3.1 或更高版本

  • 以下编译器之一:

    • GCC 4.8 或更高版本

    • Clang 3.9 或更高版本

    • MSVC 2015 或更高版本

要使用Amazon IoT Device SDK适用于 C++ v2 的
  1. 下载Amazon IoT Device SDK适用于 C++ v2(1.17.0 或更高版本)的。

  2. 按照自述文件中的安装说明从源代码构建 C++ v2 版。Amazon IoT Device SDK

  3. 在你的 C++ 构建工具中,链接你在上一步中构建的 Greengrass IPC 库AWS::GreengrassIpc-cpp。以下CMakeLists.txt示例将 Greengrass IPC 库链接到您使用 CMake 构建的项目。

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在组件代码中,创建与 C Amazon IoT Greengrass ore IPC 服务的连接以创建 IPC 客户端 () Aws::Greengrass::GreengrassCoreIpcClient。必须定义一个处理 IPC 连接、断开连接和错误事件的 IPC 连接生命周期处理程序。以下示例创建了一个 IPC 客户端和一个 IPC 连接生命周期处理程序,它们在 IPC 客户端连接、断开连接和遇到错误时进行打印。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 要在组件中运行自定义代码,请将代码构建为二进制构件,然后在组件配方中运行二进制构件。将构件的Execute权限设置为OWNER,以使Amazon IoT Greengrass核心软件能够运行二进制工件。

    您的组件配方Manifests部分可能与以下示例类似。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

要编译Amazon IoT Device SDK适用于 JavaScript v2 的,以便与 NodeJS 一起使用,设备必须具有以下工具:

  • NodeJS 10.0 或更高版本

    • 运行node -v以检查 Node 版本。

  • cMake 3.1 或更高版本

要使用Amazon IoT Device SDK适用于 JavaScript v2 的(IPC 客户端 V1)
  1. 下载Amazon IoT Device SDK适用于 JavaScript v2(v 1.12.10 或更高版本)的。

  2. 按照自述文件中的安装说明从源代码构建 JavaScript v2 版。Amazon IoT Device SDK

  3. 创建与Amazon IoT Greengrass核心 IPC 服务的连接。完成以下步骤创建 IPC 客户端并建立连接。

  4. 使用以下代码创建 IPC 客户端。

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. 使用以下代码建立从您的组件到 Greengrass 核的连接。

    await client.connect();

授权组件执行 IPC 操作

要允许您的自定义组件使用某些 IPC 操作,您必须定义允许该组件对某些资源执行操作的授权策略。每个授权策略都定义了该策略允许的操作列表和资源列表。例如,发布/订阅消息 IPC 服务定义了主题资源的发布和订阅操作。您可以使用*通配符来允许访问所有操作或所有资源。

您可以使用accessControl配置参数定义授权策略,可以在组件配方中或部署组件时设置该参数。该accessControl对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,它在所有组件中必须是唯一的。

提示

要创建唯一的策略 ID,可以组合组件名称、IPC 服务名称和计数器。例如,名为的组件com.example.HelloWorld可以定义两个具有以下 ID 的发布/订阅授权策略:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。此对象是accessControl配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

授权策略中的通配符

您可以在 IPC 授权策略resources元素中使用*通配符来允许访问单个授权策略中的多个资源。

  • 在所有版本的 Greengrass nucleus 中,您可以指定*单个角色作为资源以允许访问所有资源。

  • Greengrass nucleus v2.6.0 及更高版本中,您可以在资源中指定字符以匹配任意字符*组合。例如,您可以指定factory/1/devices/Thermostat*/status允许访问工厂中所有恒温器设备的状态主题,其中每台设备的名称都以 Thermostat “开头”。

在为 Amazon IoT Core MQTT IPC 服务定义授权策略时,也可以使用 MQTT 通配符(+#)来匹配多个资源。有关更多信息,请参阅 MQTT IPC 授权策略中的 Amazon IoT Core MQTT 通配符

授权策略中的配方变量

如果你使用 Greengrass nucleus v2.6.0 或更高版本,并且将 Greeng interpolateComponentConfigurationrass nucleus 的配置选项设置为,则可以在授权策略中使用配方变量。true{iot:thingName}当您需要包含核心设备名称的授权策略(例如 MQTT 主题或设备影子)时,可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下资源以进行影子 IPC 操作。

$aws/things/{iot:thingName}/shadow/

授权策略中的特殊字符

要在授权策略中指定文字*?字符,必须使用转义序列。以下转义序列指示 Amazon IoT Greengrass Core 软件使用字面值而不是字符的特殊含义。例如,该*字符是与任意字符组合匹配的配符。

字面字符 转义序列 注意事项

*

${*}

?

${?}

Amazon IoT Greengrass目前不支持与任何单个字符匹配的?通配符。

$

${$}

使用此转义序列来匹配包含的资源${。例如,要匹配名为的资源${resourceName},必须指定${$}{resourceName}。否则,要匹配包含的资源$,您可以使用文字$,例如允许访问以开头$aws的主题。

授权策略示例

您可以参考以下授权策略示例,以帮助您为组件配置授权策略。

例 带有授权策略的组件配方示例

以下示例组件配方包括一个定义授权策略的accessControl对象。此策略授权该com.example.HelloWorld组件发布到该test/topic主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略更新组件配置示例

以下部署中的示例配置更新指定使用定义授权策略的accessControl对象来配置组件。此策略授权该com.example.HelloWorld组件发布到该test/topic主题。

Console
要合并的配置
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
Amazon CLI

以下命令创建对核心设备的部署。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

hello-world-deployment.json文件包含以下 JSON 文档。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

以下 Greengrass CLI 命令在核心设备上创建本地部署。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

hello-world-configuration.json文件包含以下 JSON 文档。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

订阅 IPC 事件直播

您可以使用 IPC 操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请在用于打开订阅的订阅操作对象上调用 closeStream() Close() (Java)、close() (Python) 或 (C++)。

C Amazon IoT Greengrass ore IPC 服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请定义处理事件消息、错误和流关闭的回调函数。如果使用 IPC 客户端 V1,则必须在类中定义这些函数。如果您使用 IPC 客户端 V2(在 Java 和 Python 软件开发工具包的更高版本中可用),则无需创建订阅处理程序类即可定义这些函数。

Java

如果您使用 IPC 客户端 V1,则必须实现通用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>接口。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式。

void onStreamEvent(StreamEventType event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

IPC 客户端在直播错误发生时调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

void onStreamClosed()

IPC 客户端在直播关闭时调用的回调。

Python

如果您使用 IPC 客户端 V1,则必须扩展与订阅操作相对应的流响应处理程序类。Amazon IoT Device SDK包括每个订阅操作的订阅处理程序类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式。

def on_stream_event(self, event: StreamEventType) -> None

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

IPC 客户端在直播错误发生时调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

def on_stream_closed(self) -> None

IPC 客户端在直播关闭时调用的回调。

C++

实现一个从与订阅操作对应的流响应处理程序类派生的类。Amazon IoT Device SDK包括每个订阅操作的订阅处理程序基类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

void OnStreamEvent(StreamEventType *event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

bool OnStreamError(OperationError *error)

IPC 客户端在直播错误发生时调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

void OnStreamClosed()

IPC 客户端在直播关闭时调用的回调。

JavaScript

实现一个从与订阅操作对应的流响应处理程序类派生的类。Amazon IoT Device SDK包括每个订阅操作的订阅处理程序基类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

on(event: 'ended', listener: StreamingOperationEndedListener)

IPC 客户端在直播关闭时调用的回调。

on(event: 'streamError', listener: StreamingRpcErrorListener)

IPC 客户端在直播错误发生时调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

on(event: 'message', listener: (message: InboundMessageType) => void)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

订阅处理程序示例

以下示例演示如何使用SubscribeToTopic操作和订阅处理程序来订阅本地发布/订阅消息。

Java (IPC client V2)
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
例 示例:订阅本地发布/订阅消息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
例 示例:订阅本地发布/订阅消息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPC 最佳实践

在自定义组件中使用 IPC 的最佳实践在 IPC 客户端 V1 和 IPC 客户端 V2 之间有所不同。请遵循您所使用的 IPC 客户端版本的最佳实践。

IPC client V2

IPC 客户端 V2 在单独的线程中运行回调函数,因此与 IPC 客户端 V1 相比,在使用 IPC 和编写订阅处理函数时,需要遵循的准则较少。

  • 重复使用一个 IPC 客户端

    创建 IPC 客户端后,请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。

  • 处理异常

    IPC 客户端 V2 在订阅处理函数中记录未捕获的异常。您应该在处理函数中捕获异常,以处理代码中发生的错误。

IPC client V1

IPC 客户端 V1 使用与 IPC 服务器通信并调用订阅处理程序的单线程。在编写订阅处理函数时,必须考虑这种同步行为。

  • 重复使用一个 IPC 客户端

    创建 IPC 客户端后,请将其保持打开状态并重复用于所有 IPC 操作。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。

  • 异步运行阻塞代码

    当线程被阻塞时,IPC 客户端 V1 无法发送新请求或处理新的事件消息。你应该在从处理函数运行的单独线程中运行阻塞代码。阻塞代码包括sleep调用、持续运行的循环以及需要一段时间才能完成的同步 I/O 请求。

  • 异步发送新的 IPC 请求

    IPC 客户端 V1 无法从订阅处理函数中发送新请求,因为如果您等待响应,该请求会阻塞处理器函数。您应该在从处理程序函数运行的单独线程中发送 IPC 请求。

  • 处理异常

    IPC 客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅将关闭,并且该异常不会出现在您的组件日志中。您应该在处理程序函数中捕获异常,以保持订阅处于打开状态,并记录代码中发生的错误。