使用 Amazon IoT Device SDK 与 Greengrass 原子核、其他组件进行通信 Amazon IoT Core - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon IoT Device SDK 与 Greengrass 原子核、其他组件进行通信 Amazon IoT Core

在核心设备上运行的组件可以使用中的 Amazon IoT Greengrass 核心进程间通信 (IPC) 库与 Amazon IoT Greengrass 核心和 Amazon IoT Device SDK 其他 Greengrass 组件进行通信。要开发和运行使用的自定义组件IPC,必须使用连接 Amazon IoT Device SDK 到 C Amazon IoT Greengrass ore IPC 服务并执行IPC操作。

该IPC接口支持两种类型的操作:

  • 请求/响应

    组件向IPC服务发送请求并收到包含请求结果的响应。

  • 订阅

    组件向IPC服务发送订阅请求,并期望事件消息流作为响应。组件提供了一个订阅处理程序,用于处理事件消息、错误和流关闭。 Amazon IoT Device SDK 包括一个处理程序接口,其中包含每个IPC操作的正确响应和事件类型。有关更多信息,请参阅 订阅IPC活动直播

IPC客户端版本

在更高版本的 Java 和 Python 中SDKs, Amazon IoT Greengrass 提供了IPC客户端的改进版本,称为IPC客户端 V2。IPC客户端 V2:

  • 减少使用IPC操作所需编写的代码量,并有助于避免IPC客户端 V1 可能出现的常见错误。

  • 在单独的线程中调用订阅处理程序回调,因此您现在可以在订阅处理程序回调中运行阻塞代码,包括其他IPC函数调用。IPC客户端 V1 使用同一个线程与IPC服务器通信并调用订阅处理程序回调。

  • 允许您使用 Lambda 表达式 (Java) 或函数 (Python) 调用订阅操作。IPC客户端 V1 要求您定义订阅处理程序类。

  • 提供每个IPC操作的同步和异步版本。IPC客户端 V1 仅提供每个操作的异步版本。

我们建议您使用IPC客户端 V2 来利用这些改进。但是,本文档和一些在线内容中的许多示例仅演示如何使用IPC客户端 V1。您可以使用以下示例和教程来查看使用IPC客户端 V2 的示例组件:

目前, Amazon IoT Device SDK 适用于 C++ 的 v2 仅支持IPC客户端 V1。

SDKs支持进程间通信

Amazon IoT Greengrass 核心IPC库包含在以下 Amazon IoT Device SDK 版本中。

Connect 到 Amazon IoT Greengrass Core IPC 服务

要在自定义组件中使用进程间通信,必须创建与 C Amazon IoT Greengrass ore 软件运行的IPC服务器套接字的连接。完成以下任务,下载并使用您选择 Amazon IoT Device SDK 的语言。

要使用 Amazon IoT Device SDK 适用于 Java v2(IPC客户端 V2)
  1. 下载Amazon IoT Device SDK 适用于 Java 版本 2(1.6.0 或更高版本)的。

  2. 执行以下任一操作以在组件中运行您的自定义代码:

    • 将您的组件构建为包含的JAR文件 Amazon IoT Device SDK,然后在组件配方中运行此JAR文件。

    • 将定义 Amazon IoT Device SDK JAR为组件工件,并在组件配方中运行应用程序时将该构件添加到类路径中。

  3. 使用以下代码创建IPC客户端。

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
要使用 Amazon IoT Device SDK 适用于 Python v2(IPC客户端 V2)
  1. 下载Amazon IoT Device SDK 适用于 Python 的(1.9.0 或更高版本)。

  2. 将SDK的安装步骤添加到组件配方的安装生命周期中。

  3. 创建与 Amazon IoT Greengrass 核心IPC服务的连接。使用以下代码创建IPC客户端。

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

要构建 C++ 版 Amazon IoT Device SDK v2,设备必须具有以下工具:

  • C++ 11 或更高版本

  • CMake3.1 或更高版本

  • 以下编译器之一:

    • GCC4.8 或更高版本

    • Clang 3.9 或更高版本

    • MSVC2015 年或以后

要使用 Amazon IoT Device SDK 适用于 C++ v2 的
  1. 下载Amazon IoT Device SDK 适用于 C++ v2(1.17.0 或更高版本)的。

  2. 按照中的安装说明从源代码构建 C++ v2 的。README Amazon IoT Device SDK

  3. 在你的 C++ 构建工具中,链接你在上一步中构建的 IPC Greengrass 库AWS::GreengrassIpc-cpp。以下CMakeLists.txt示例将 G IPC reengrass 库链接到您用来构建的项目。CMake

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在组件代码中,创建与 Amazon IoT Greengrass 核心IPC服务的连接以创建IPC客户端 (Aws::Greengrass::GreengrassCoreIpcClient)。您必须定义一个用于处理IPC连接、断开IPC连接和错误事件的连接生命周期处理程序。以下示例创建了一个IPC客户端和一个IPC连接生命周期处理程序,用于在IPC客户端连接、断开连接和遇到错误时进行打印。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 要在组件中运行自定义代码,请将代码构建为二进制构件,然后在组件配方中运行二进制构件。将构件的Execute权限设置为OWNER,以使 Amazon IoT Greengrass 核心软件能够运行二进制工件。

    您的组件配方Manifests部分可能与以下示例类似。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

要编译 Amazon IoT Device SDK 适用于 JavaScript v2 的,以便与 NodeJS 一起使用,设备必须具有以下工具:

  • NodeJS 10.0 或更高版本

    • 运行node -v以检查 Node 版本。

  • CMake3.1 或更高版本

要使用 Amazon IoT Device SDK 适用于 JavaScript v2(IPC客户端 V1)
  1. 下载Amazon IoT Device SDK 适用于 JavaScript v2(v 1.12.10 或更高版本)的。

  2. 按照中的安装说明从源代码构建 JavaScript v2 版。README Amazon IoT Device SDK

  3. 创建与 Amazon IoT Greengrass 核心IPC服务的连接。完成以下步骤以创建IPC客户端并建立连接。

  4. 使用以下代码创建IPC客户端。

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. 使用以下代码建立从您的组件到 Greengrass 核的连接。

    await client.connect();

授权组件执行IPC操作

要允许您的自定义组件使用某些IPC操作,您必须定义允许该组件对某些资源执行操作的授权策略。每个授权策略都定义了该策略允许的操作列表和资源列表。例如,发布/订阅消息IPC服务定义了主题资源的发布和订阅操作。您可以使用*通配符来允许访问所有操作或所有资源。

您可以使用accessControl配置参数定义授权策略,可以在组件配方中或部署组件时设置该参数。该accessControl对象将IPC服务标识符映射到授权策略列表。您可以为每项IPC服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,它在所有组件中必须是唯一的。

提示

要创建唯一的策略IDs,您可以组合组件名称、IPC服务名称和计数器。例如,名为的组件com.example.HelloWorld可以定义两个发布/订阅授权策略,如下所示:IDs

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。此对象是accessControl配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

授权策略中的通配符

您可以在IPC授权策略resources元素中使用*通配符来允许访问单个授权策略中的多个资源。

  • 在所有版本的 Greengrass nucleus 中,您可以指定*单个角色作为资源以允许访问所有资源。

  • Greengrass nucleus v2.6.0 及更高版本中,您可以在资源中指定字符以匹配任意字符*组合。例如,您可以指定factory/1/devices/Thermostat*/status允许访问工厂中所有恒温器设备的状态主题,其中每台设备的名称都以 Thermostat “开头”。

在为 Amazon IoT Core MQTTIPC服务定义授权策略时,也可以使用MQTT通配符(+#)来匹配多个资源。有关更多信息,请参阅Amazon IoT Core MQTTIPC授权策略中的MQTT通配符

授权策略中的配方变量

如果你使用 Greengrass nucleus v2.6.0 或更高版本,并且将 Greeng interpolateComponentConfigurationrass nucleus 的配置选项设置为,则可以在授权策略中使用配方变量。true{iot:thingName}当您需要包含核心设备名称的授权策略(例如MQTT主题或设备影子)时,可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下资源以进行影子IPC操作。

$aws/things/{iot:thingName}/shadow/

授权策略中的特殊字符

要在授权策略中指定文字*?字符,必须使用转义序列。以下转义序列指示 Amazon IoT Greengrass Core 软件使用字面值而不是字符的特殊含义。例如,该*字符是与任意字符组合匹配的配符。

字面字符 转义序列 注意

*

${*}

?

${?}

Amazon IoT Greengrass 目前不支持与任何单个字符匹配的?通配符。

$

${$}

使用此转义序列来匹配包含的资源${。例如,要匹配名为的资源${resourceName},必须指定${$}{resourceName}。否则,要匹配包含的资源$,您可以使用文字$,例如允许访问以开头$aws的主题。

授权策略示例

您可以参考以下授权策略示例,以帮助您为组件配置授权策略。

例 带有授权策略的组件配方示例

以下示例组件配方包括一个定义授权策略的accessControl对象。此策略授权该com.example.HelloWorld组件发布到该test/topic主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略更新组件配置示例

以下部署中的示例配置更新指定使用定义授权策略的accessControl对象来配置组件。此策略授权该com.example.HelloWorld组件发布到该test/topic主题。

Console
要合并的配置
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
Amazon CLI

以下命令创建对核心设备的部署。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

hello-world-deployment.json文件包含以下JSON文档。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

以下 G CLI reengrass 命令在核心设备上创建本地部署。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

hello-world-configuration.json文件包含以下JSON文档。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

订阅IPC活动直播

您可以使用IPC操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对IPC服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请在用于打开订阅的订阅操作对象上调用 closeStream() Close() (Java)、close() (Python) 或 (C++)。

Amazon IoT Greengrass 核心IPC服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请定义处理事件消息、错误和流关闭的回调函数。如果使用IPC客户端 V1,则必须在类中定义这些函数。如果您使用IPC客户端 V2(在 Java 和 Python 的更高版本中可用)SDKs,则无需创建订阅处理程序类即可定义这些函数。

Java

如果使用IPC客户端 V1,则必须实现通用接software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>口。StreamEventType 是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用IPC客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式

void onStreamEvent(StreamEventType event)

IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

发生直播错误时IPC客户端调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

void onStreamClosed()

直播关闭时IPC客户端调用的回调。

Python

如果您使用IPC客户端 V1,则必须扩展与订阅操作相对应的流响应处理程序类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序类。StreamEventType 是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用IPC客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式

def on_stream_event(self, event: StreamEventType) -> None

IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

发生直播错误时IPC客户端调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

def on_stream_closed(self) -> None

直播关闭时IPC客户端调用的回调。

C++

实现一个从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType 是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

void OnStreamEvent(StreamEventType *event)

IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。

bool OnStreamError(OperationError *error)

发生直播错误时IPC客户端调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

void OnStreamClosed()

直播关闭时IPC客户端调用的回调。

JavaScript

实现一个从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType 是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

on(event: 'ended', listener: StreamingOperationEndedListener)

直播关闭时IPC客户端调用的回调。

on(event: 'streamError', listener: StreamingRpcErrorListener)

发生直播错误时IPC客户端调用的回调。

如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。

on(event: 'message', listener: (message: InboundMessageType) => void)

IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。

订阅处理程序示例

以下示例演示如何使用SubscribeToTopic操作和订阅处理程序来订阅本地发布/订阅消息。

Java (IPC client V2)
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
例 示例:订阅本地发布/订阅消息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
例 示例:订阅本地发布/订阅消息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPC最佳实践

IPC在自定义组件中使用的最佳实践在IPC客户端 V1 和IPC客户端 V2 之间有所不同。请遵循所用IPC客户端版本的最佳实践。

IPC client V2

IPC客户端 V2 在单独的线程中运行回调函数,因此与IPC客户端 V1 相比,在使用IPC和编写订阅处理函数时,需要遵循的准则较少。

  • 重复使用一个IPC客户端

    创建IPC客户端后,请将其保持打开状态并在所有IPC操作中重复使用。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。

  • 处理异常

    IPC客户端 V2 在订阅处理函数中记录未捕获的异常。您应该在处理函数中捕获异常,以处理代码中发生的错误。

IPC client V1

IPC客户端 V1 使用与IPC服务器通信并调用订阅处理程序的单线程。在编写订阅处理函数时,必须考虑这种同步行为。

  • 重复使用一个IPC客户端

    创建IPC客户端后,请将其保持打开状态并在所有IPC操作中重复使用。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。

  • 异步运行阻塞代码

    当线程被阻塞时,IPC客户端 V1 无法发送新请求或处理新的事件消息。你应该在从处理函数运行的单独线程中运行阻塞代码。阻塞代码包括sleep调用、持续运行的循环以及需要一段时间才能完成的同步 I/O 请求。

  • 异步发送新IPC请求

    IPC客户端 V1 无法从订阅处理函数中发送新请求,因为如果您等待响应,该请求会阻塞处理器函数。您应该在从处理函数运行的单独线程中发送IPC请求。

  • 处理异常

    IPC客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅将关闭,并且该异常不会出现在您的组件日志中。您应该在处理程序函数中捕获异常,以保持订阅处于打开状态,并记录代码中发生的错误。