使用Amazon IoT Device SDK用于进程间通信 (IPC) - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用Amazon IoT Device SDK用于进程间通信 (IPC)

核心设备上运行的组件可以使用Amazon IoT Greengrass核心进程间通信 (IPC) 库Amazon IoT Device SDK与其他Amazon IoT Greengrass组件和进程. 要开发和运行使用 IPC 的自定义组件,必须使用Amazon IoT Device SDK将连接到Amazon IoT Greengrass核心 IPC 服务和执行 IPC 操作。

IPC 接口支持两种操作:

  • 请求/响应/响应

    组件向 IPC 服务发送请求并接收包含请求结果的响应。

  • 订阅

    组件向 IPC 服务发送订阅请求,并期望响应事件消息流。组件提供了一个预订处理程序,用于处理事件消息、错误和流关闭。这些区域有:Amazon IoT Device SDK包含一个处理程序接口,其中包含针对每个 IPC 操作的正确响应和事件类型。有关更多信息,请参阅订阅 IPC 事件流

进程间通信的受支持软件开发工具包

这些区域有:Amazon IoT Greengrass核心 IPC 库包含在以下Amazon IoT Device SDK版本。

Connect 到Amazon IoT GreengrassCore IPC 服务

若要在自定义组件中使用进程间通信,必须创建一个连接到 IPC 服务器套接字Amazon IoT Greengrass核心软件运行。完成以下任务,下载和使用Amazon IoT Device SDK使用您选择的语言。

使用Amazon IoT Device SDK适用于 Java v2

  1. 下载Amazon IoT Device SDK适用于 Java v2(版本 1.2.10 或更高版本)。

  2. 执行以下操作之一,在组件中运行自定义代码:

    • 将组件构建为包含Amazon IoT Device SDK,然后在组件配方中运行此 JAR 文件。

    • 定义Amazon IoT Device SDKJAR 作为组件工件,并在组件配方中运行应用程序时将该工件添加到类路径中。

  3. 创建连接到Amazon IoT Greengrass核心 IPC 服务。IPC 客户端GreengrassCoreIPCClient,则需要EventStreamRPCConnection. 下载以下IPCUtils类为您提供此连接。

    /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } // removed dependency on kernel, as it is only being used to pull ipcServerSocketPath private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean disconnected[] = {false}; final int disconnectedCode[] = {-1}; //this is a bit cumbersome but does not prevent a convenience wrapper from exposing a sync //connect() or a connect() that returns a CompletableFuture that errors //this could be wrapped by utility methods to provide a more connection.connect(new EventStreamRPCConnection.LifecycleHandler() { //only called on successful connection. // That is full on Connect -> ConnectAck(ConnectionAccepted=true) @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } //This on error is for any errors that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; //hints at handler to disconnect due to this error } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
  4. 使用以下代码创建 IPC 客户端。

    try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }

使用Amazon IoT Device SDK适用于 Python v2

  1. 下载Amazon IoT Device SDK适用于 Python 的(版本 1.5.3 或更高版本)。

  2. 添加软件开发工具包安装步骤添加到组件配方中的安装生命周期。

  3. 创建连接到Amazon IoT Greengrass核心 IPC 服务。完成以下步骤以创建 IPC 客户端和建立连接。

    SDK v1.5.4 or later

    使用以下代码创建 IPC 客户端。

    import awsiot.greengrasscoreipc ipc_client = awsiot.greengrasscoreipc.connect()
    SDK v1.5.3
    1. 下载以下IPCUtils类,该类为您提供 IPC 服务器连接。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment TIMEOUT = 10 class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection
    2. 使用以下代码创建 IPC 客户端。

      import awsiot.greengrasscoreipc.client as client ipc_utils = IPCUtils() connection = ipc_utils.connect() ipc_client = client.GreengrassCoreIPCClient(connection)
重要

构建Amazon IoT Device SDKv2 对于 C ++,设备必须具有以下工具:

  • C++ 11 或更高版本

  • CMake 3.1 或更高版本

  • 以下编译器之一:

    • GCC 4.8 或更高版本

    • Clanang 3.9 或更高版本

    • 2015 年或更高版本

使用Amazon IoT Device SDK适用于 C++ v2

  1. 下载Amazon IoT Device SDK适用于 C++ v2(版本 1.13.0 或更高版本)。

  2. 按照自述文件中的安装说明构建Amazon IoT Device SDK来自源代码的 C ++ v2。

  3. 在你的 C ++ 构建工具中,链接 Greengrass IPC 库AWS::GreengrassIpc-cpp,您在上一步中构建的。以下CMakeLists.txt示例将 Greengrass IPC 库链接到您使用 CMake 构建的项目。

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在您的组件代码中,创建一个与Amazon IoT Greengrass用于创建 IPC 客户端的核心 IPC 服务 (Aws::Greengrass::GreengrassCoreIpcClient)。您必须定义用于处理 IPC 连接、断开连接和错误事件的 IPC 连接生命周期处理程序。以下示例创建 IPC 客户端和 IPC 连接生命周期处理程序,该处理程序在 IPC 客户端连接、断开连接和遇到错误时进行打印。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 要在组件中运行自定义代码,请将代码构建为二进制工件,然后在组件配方中运行二进制工件。设置工件的Execute权限OWNER以启用Amazon IoT Greengrass用于运行二进制工件的核心软件。

    您的组件配方的Manifests部分可能与以下示例类似。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

授权组件执行 IPC 操作

要允许自定义组件使用某些 IPC 操作,必须定义授权策略,允许组件对某些资源执行操作。每个授权策略都定义了一个操作列表和策略允许的资源列表。例如,发布/订阅消息传递 IPC 服务定义主题资源的发布和订阅操作。您可以使用*通配符以允许访问所有操作或所有资源。

您可以在组件配方中定义授权策略,并使用accessControl配置参数。这些区域有:accessControl对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问。每个授权策略都有一个策略 ID,该 ID 在所有组件中必须是唯一的。

提示

若要创建唯一的策略 ID,可以组合组件名称、IPC 服务名称和计数器。例如,名为com.example.HelloWorld可能会定义两个具有以下 ID 的发布/订阅授权策略:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。此对象是accessControl配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

例 具有授权策略的组件配方示例

以下示例组件配方包含accessControl对象定义了授权策略。此策略授权com.example.HelloWorld组件发布到test/topic主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -Dlog.level=INFO -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -Dlog.level=INFO -jar {artifacts:path}/HelloWorld.jar

订阅 IPC 事件流

您可以使用 IPC 操作订阅 Greengrass 核心设备上的事件。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每次核心设备将事件消息流式传输到组件时,IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请调用closeStream()(Java),close()(Python) 或Close()(C++) 在用于打开预订的预订操作对象上。

这些区域有:Amazon IoT Greengrass核心 IPC 服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请创建一个包含处理事件消息、错误和流关闭的回调函数的类。

Java

实施通用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>接口。流事件类型是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

void onStreamEvent(StreamEventType event)

IPC 客户端在收到事件消息(如 MQTT 消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

IPC 客户端在出现流错误时调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或返回 false 以保持流处于打开状态。

void onStreamClosed()

流关闭时 IPC 客户端调用的回调函数。

Python

扩展与订阅操作对应的流响应处理程序类。这些区域有:Amazon IoT Device SDK包含每个订阅操作的订阅处理程序类。流事件类型是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

def on_stream_event(self, event: StreamEventType) -> None

IPC 客户端在收到事件消息(如 MQTT 消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

IPC 客户端在出现流错误时调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或返回 false 以保持流处于打开状态。

def on_stream_closed(self) -> None

流关闭时 IPC 客户端调用的回调函数。

C++

实现从对应于订阅操作的流响应处理程序类派生的类。这些区域有:Amazon IoT Device SDK包含每个订阅操作的订阅处理程序基类。流事件类型是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

void OnStreamEvent(StreamEventType *event)

IPC 客户端在收到事件消息(如 MQTT 消息或组件更新通知)时调用的回调。

bool OnStreamError(OperationError *error)

IPC 客户端在出现流错误时调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或返回 false 以保持流处于打开状态。

void OnStreamClosed()

流关闭时 IPC 客户端调用的回调函数。

订阅处理程序的最佳实践

IPC 客户端使用与 IPC 服务器通信并调用订阅处理程序的单个线程。编写订阅处理程序函数时,必须考虑此同步行为。编写订阅处理程序函数时,请遵循以下准则。

  • 异步运行阻塞代码

    在线程被阻止时,IPC 客户端无法发送新请求或处理新事件消息。您可以在从处理程序函数运行的单独线程中运行阻塞代码。阻止代码包括sleep调用、连续运行的循环以及需要时间才能完成的同步 I/O 请求。

  • 异步发送新的 IPC 请求

    IPC 客户端无法从订阅处理程序函数中发送新请求,因为如果您等待响应,请求会阻塞处理程序函数。您可以在从处理程序函数运行的单独线程中发送 IPC 请求。

  • 处理异常

    IPC 客户端不处理订阅处理程序函数中的未捕获异常。如果您的处理程序函数抛出异常,则订阅关闭,并且异常不会出现在组件日志中。您可以在处理程序函数中捕获异常以保持订阅打开状态并记录代码中发生的错误。

示例订阅处理程序

以下示例演示如何使用SubscribeToTopic操作和订阅处理程序,以订阅本地发布/订阅消息。

Java

例如:订阅本地发布/订阅消息

String topic = "my/topic"; SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new StreamResponseHandler<SubscriptionResponseMessage>() { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); // Handle message. } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { // Handle error. return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { // Handle close. } }; SubscribeToTopicResponseHandler operationResponseHandler = greengrassCoreIPCClient .subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); operationResponseHandler.getResponse().get(); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. operationResponseHandler.closeStream();
Python

例如:订阅本地发布/订阅消息

注意

此示例假设您使用的是版本 1.5.4 或更高版本。Amazon IoT Device SDK适用于 Python v2。如果使用 SDK 的版本 1.5.3,请参阅。使用Amazon IoT Device SDK适用于 Python v2,了解有关连接到Amazon IoT Greengrass核心 IPC 服务。

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++

例如:订阅本地发布/订阅消息

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); SubscribeResponseHandler streamHandler; SubscribeToTopicOperation operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation.Close(); return 0; }