使用Amazon IoT Device SDK与 Greengrass 核心、其他组件进行沟通Amazon IoT Core - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用Amazon IoT Device SDK与 Greengrass 核心、其他组件进行沟通Amazon IoT Core

在核心设备上运行的组件可以使用Amazon IoT Greengrass中的核心进程间通信 (IPC) 库Amazon IoT Device SDK与通信Amazon IoT Greengrass核心和其他 Greengrass 组件。要开发和运行使用 IPC 的自定义组件,必须使用Amazon IoT Device SDK连接到Amazon IoT Greengrass核心 IPC 服务并执行 IPC 操作。

IPC 接口支持两种类型的操作:

  • 请求/响应

    组件向 IPC 服务发送请求并接收包含请求结果的响应。

  • 订阅

    组件向 IPC 服务发送订阅请求,并期望收到一系列事件消息作为响应。组件提供了一个处理事件消息、错误和直播关闭的订阅处理程序。这些区域有:Amazon IoT Device SDK包括一个处理程序接口,其中包含每个 IPC 操作的正确响应和事件类型。有关更多信息,请参阅 订阅 IPC 事件直播

支持的进程间通信的 SDK

这些区域有:Amazon IoT Greengrass以下内容包括核心 IPC 库Amazon IoT Device SDK版本。

Connect 到Amazon IoT GreengrassIPC 核心服务

要在自定义组件中使用进程间通信,必须创建与 IPC 服务器套接字的连接,Amazon IoT Greengrass运行核心软件。完成以下任务以下任务以下载并使用Amazon IoT Device SDK用你选择的语言。

使用Amazon IoT Device SDK适用于 Java v2

  1. 下载Amazon IoT Device SDK适用于 Java v2(v1.2.10 或更高版本)。

  2. 执行以下操作之一,在组件中运行自定义代码:

    • 将组件构建为包含Amazon IoT Device SDK,然后在组件配方中运行此 JAR 文件。

    • 定义Amazon IoT Device SDKJAR 作为组件工件,并在组件配方中运行应用程序时将该工件添加到类路径中。

  3. 创建到Amazon IoT Greengrass核心 IPC 服务。IPC 客户端,GreengrassCoreIPCClient,需要EventStreamRPCConnection. 下载以下内容IPCUtils为你提供此连接的类。

    /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.greengrass.util; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection. public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean[] disconnected = {false}; final int[] disconnectedCode = {-1}; connection.connect(new EventStreamRPCConnection.LifecycleHandler() { // Only called on successful connection. @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } // This on error is for any error that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; // True instructs handler to disconnect due to this error. } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
  4. 使用以下代码创建 IPC 客户端。

    try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }

使用Amazon IoT Device SDK适用于 Python v2

  1. 下载Amazon IoT Device SDK适用于 Python 的(v1.5.3 或更高版本)。

  2. 添加软件开发工具包安装步骤到组件配方中的安装生命周期。

  3. 创建到Amazon IoT Greengrass核心 IPC 服务。完成以下步骤以创建 IPC 客户端并建立连接。

    SDK v1.5.4 or later

    使用以下代码创建 IPC 客户端。

    import awsiot.greengrasscoreipc ipc_client = awsiot.greengrasscoreipc.connect()
    SDK v1.5.3
    1. 下载以下内容IPCUtils为您提供 IPC 服务器连接的类。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment TIMEOUT = 10 class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection
    2. 使用以下代码创建 IPC 客户端。

      import awsiot.greengrasscoreipc.client as client ipc_utils = IPCUtils() connection = ipc_utils.connect() ipc_client = client.GreengrassCoreIPCClient(connection)
注意

Amazon IoT Greengrass目前在 Windows 核心设备上不支持此功能。

构建Amazon IoT Device SDKv2 对于 C++,设备必须具有以下工具:

  • C++ 11 或更高版本

  • CMake 3.1 或更高版本

  • 以下编译器之一:

    • GCC 4.8 或更高版本

    • Clang 3.9 或更高版本

    • MSVC 2015 或更高版本

使用Amazon IoT Device SDK适用于 C++ v2

  1. 下载Amazon IoT Device SDK适用于 C++ v2(Linux:v1.13.0 或更高版本;视窗:v1.14.6 或更高版本)。

  2. 关注自述文件中的安装说明构建Amazon IoT Device SDK适用于源代码的 C++ v2。

  3. 在你的 C++ 构建工具中,链接 Greengrass IPC 库,AWS::GreengrassIpc-cpp,您在上一步中构建的。以下CMakeLists.txt示例将 Greengrass IPC 库链接到您使用 CMake 构建的项目。

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在组件代码中,创建一个连接到Amazon IoT Greengrass用于创建 IPC 客户端的核心 IPC 服务 (Aws::Greengrass::GreengrassCoreIpcClient)。您必须定义一个 IPC 连接生命周期处理程序来处理 IPC 连接、断开连接和错误事件。以下示例创建 IPC 客户端和 IPC 连接生命周期处理程序,该处理程序可在 IPC 客户端连接、断开连接和遇到错误时进行打印。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 要在组件中运行自定义代码,请将代码构建为二进制工件,然后在组件配方中运行二进制工件。设置神器的Execute允许OWNER启用Amazon IoT Greengrass用于运行二进制工件的核心软件。

    你的组件配方Manifests部分可能与以下示例类似。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

授权组件执行 IPC 操作

要允许自定义组件使用某些 IPC 操作,必须定义授权策略允许组件对某些资源执行操作。每个授权策略都定义了一个操作列表和策略允许的资源列表。例如,发布/订阅消息传递 IPC 服务定义了主题资源的发布和订阅操作。您可以使用*通配符允许访问所有操作或所有资源。

您可以使用accessControl配置参数,您可以在组件配方中或在部署组件时设置该参数。这些区域有:accessControl对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问。每个授权策略都有一个策略 ID,该 ID 在所有组件中必须是唯一的。

提示

要创建唯一的策略 ID,可以将组件名称、IPC 服务名称和计数器组合起来。例如,名为的组件com.example.HelloWorld可能会定义两个具有以下 ID 的发布/订阅授权策略:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。这个对象是accessControl配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

例 具有授权策略的组件配方示例

以下示例组件配方包括accessControl对象定义了授权策略。此政策授权com.example.HelloWorld要发布到test/topic主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/HelloWorld.jar

例 使用授权策略更新组件配置示例

以下部署中的示例配置更新指定使用accessControl定义授权策略的对象。此政策授权com.example.HelloWorld要发布到test/topic主题。

Console
要合并的配置
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
Amazon CLI

以下命令将创建到核心设备的部署。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

这些区域有:hello-world-deployment.json文件包含以下 JSON 文档。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

以下Greengrass CLI命令在核心设备上创建本地部署。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

这些区域有:hello-world-configuration.json文件包含以下 JSON 文档。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

订阅 IPC 事件直播

您可以使用 IPC 操作订阅 Greengrass 核心设备上的事件流。要使用订阅操作,请定义订阅处理器然后创建对 IPC 服务的请求。然后,每次核心设备将事件消息传输到组件时,IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请致电closeStream()(Java),close()(Python),或Close()(C++) 在用于打开订阅的订阅操作对象上。

这些区域有:Amazon IoT Greengrass核心 IPC 服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请创建一个带有处理事件消息、错误和直播关闭的回调函数的类。

Java

实现通用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>接口。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和直播关闭。

void onStreamEvent(StreamEventType event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

在出现流错误时,IPC 客户端调用的回调。

返回 true 以由于错误关闭订阅流,或返回 false 以保持直播打开状态。

void onStreamClosed()

流关闭时 IPC 客户端调用的回调。

Python

扩展与订阅操作对应的流响应处理程序类。这些区域有:Amazon IoT Device SDK包括每个订阅操作的订阅处理程序类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和直播关闭。

def on_stream_event(self, event: StreamEventType) -> None

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

在出现流错误时,IPC 客户端调用的回调。

返回 true 以由于错误关闭订阅流,或返回 false 以保持直播打开状态。

def on_stream_closed(self) -> None

流关闭时 IPC 客户端调用的回调。

C++

实现一个从与订阅操作对应的流响应处理程序类派生的类。这些区域有:Amazon IoT Device SDK包括每个订阅操作的订阅处理程序基类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和直播关闭。

void OnStreamEvent(StreamEventType *event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

bool OnStreamError(OperationError *error)

在出现流错误时,IPC 客户端调用的回调。

返回 true 以由于错误关闭订阅流,或返回 false 以保持直播打开状态。

void OnStreamClosed()

流关闭时 IPC 客户端调用的回调。

订阅处理程序的最佳实践

IPC 客户端使用与 IPC 服务器进行通信的单个线程并调用您的订阅处理程序。在编写订阅处理程序函数时,必须考虑这种同步行为。编写订阅处理程序函数时,请遵循这些准则。

  • 异步运行阻止代码

    在线程被阻止时,IPC 客户端无法发送新请求或处理新的事件消息。您可以在从处理函数运行的单独线程中运行阻塞代码。阻止代码包括sleep调用、持续运行的循环以及需要时间才能完成的同步 I/O 请求。

  • 异步发送新的 IPC 请求

    IPC 客户端无法从订阅处理函数中发送新请求,因为如果您等待响应,请求会阻止处理函数。您可以在从处理函数运行的单独线程中发送 IPC 请求。

  • 处理异常

    IPC 客户端不处理订阅处理函数中未捕获的异常。如果你的处理函数抛出异常,订阅将关闭,并且异常不会出现在组件日志中。您可以在处理函数中 catch 异常以保持订阅打开状态并记录代码中发生的错误。

示例订阅处理程序

以下示例说明了如何使用SubscribeToTopic操作和订阅处理程序,用于订阅本地发布/订阅消息。

Java

例如:订阅本地发布/订阅消息

注意

此示例使用IPCUtils类来创建到Amazon IoT Greengrass核心 IPC 服务。有关更多信息,请参阅 Connect 到Amazon IoT GreengrassIPC 核心服务

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToTopic { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new SubscriptionResponseHandler(topic); SubscribeToTopicResponseHandler responseHandler = SubscribeToTopic.subscribeToTopic(ipcClient, topic, streamResponseHandler); CompletableFuture<SubscribeToTopicResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToTopicResponseHandler subscribeToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler) { SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); return greengrassCoreIPCClient.subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { private final String topic; public SubscriptionResponseHandler(String topic) { this.topic = topic; } @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage().getMessage(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", this.topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
Python

例如:订阅本地发布/订阅消息

注意

此示例假设您正在使用版本 1.5.4 或更高版本。Amazon IoT Device SDK适用于 Python v2。如果您正在使用 SDK 的版本 1.5.3,请参阅使用Amazon IoT Device SDK适用于 Python v2了解有关连接到Amazon IoT Greengrass核心 IPC 服务。

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++

例如:订阅本地发布/订阅消息

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); SubscribeResponseHandler streamHandler; SubscribeToTopicOperation operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation.Activate(request, nullptr); activate.wait(); auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation.Close(); return 0; }