使用Amazon IoT Device SDK与 Greengrass 核、其他组件进行通信,以及Amazon IoT Core - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用Amazon IoT Device SDK与 Greengrass 核、其他组件进行通信,以及Amazon IoT Core

在核心设备上运行的组件可以使用中的Amazon IoT Greengrass核心进程间通信 (IPC) 库与 nAmazon IoT Greengrass ucleus 和其他 Greengrass 组件进行通信。Amazon IoT Device SDK要开发和运行使用 IPC 的自定义组件,必须使用连接Amazon IoT Device SDK到 CAmazon IoT Greengrass ore IPC 服务并执行 IPC 操作。

IPC 接口支持两种类型的操作:

  • 请求/响应

    组件向 IPC 服务发送请求并收到包含请求结果的响应。

  • 订阅

    组件向 IPC 服务发送订阅请求,并期望收到事件消息流作为响应。组件提供订阅处理程序,用于处理事件消息、错误和流关闭。Amazon IoT Device SDK包括一个处理器接口,其中包含每个 IPC 操作的正确响应和事件类型。有关更多信息,请参阅订阅 IPC 事件直播

IPC 客户端版本

在更高版本的 Java 和 Python 软件开发工具包中,Amazon IoT Greengrass提供了 IPC 客户端的改进版本,称为 IPC 客户端 V2。IPC 客户端 V2:

  • 减少了使用 IPC 操作所需的代码量,有助于避免 IPC 客户端 V1 可能出现的常见错误。

  • 在单独的线程中调用订阅处理程序回调,因此您现在可以在订阅处理器回调中运行阻塞代码,包括其他 IPC 函数调用。IPC 客户端 V1 使用相同的线程与 IPC 服务器通信并调用订阅处理程序回调。

  • 允许您使用 Lambda 表达式 (Java) 或函数 (Python) 调用订阅操作。IPC 客户端 V1 要求您定义订阅处理程序类。

  • 提供每个 IPC 操作的同步和异步版本。IPC 客户端 V1 仅提供每个操作的异步版本。

建议您使用 IPC Client V2 以利用这些改进。但是,本文档和某些在线内容中的许多示例仅演示了如何使用 IPC 客户端 V1。您可以使用以下示例和教程来查看使用 IPC 客户端 V2 的示例组件:

目前,fAmazon IoT Device SDK or C++ v2 仅支持 IPC 客户端 V1。

支持的进程间通信软件开发工具包

CAmazon IoT Greengrass ore IPC 库包含在以下Amazon IoT Device SDK版本中。

ConnectAmazon IoT Greengrass 核心 IPC 服务

要在自定义组件中使用进程间通信,必须创建与Amazon IoT Greengrass Core 软件运行的 IPC 服务器套接字的连接。完成以下任务,以您选择Amazon IoT Device SDK的语言下载和使用。

要使用 fAmazon IoT Device SDK or Java v2 (IPC 客户端 V2)
  1. 下载Amazon IoT Device SDK适用于 Java v2(v 1.6.0 或更高版本)的。

  2. 执行以下任一操作以在组件中运行自定义代码:

    • 将您的组件构建为包含的 JAR 文件Amazon IoT Device SDK,然后在组件配方中运行此 JAR 文件。

    • 将Amazon IoT Device SDK JAR 定义为组件工件,并在组件配方中运行应用程序时将该工件添加到类路径中。

  3. 使用以下代码创建 IPC 客户端。

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
要使用 fAmazon IoT Device SDK or Python v2 (IPC 客户端 V2)
  1. 下载Amazon IoT Device SDK适用于 Python 的(v1.9.0 或更高版本)。

  2. 将 SDK 的安装步骤添加到组件配方中的安装生命周期中。

  3. 创建与核Amazon IoT Greengrass心 IPC 服务的连接。使用以下代码创建 IPC 客户端。

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
要使用 fAmazon IoT Device SDK or Java v2 (IPC 客户端 V1)
  1. 下载Amazon IoT Device SDK适用于 Java v2(v 1.2.10 或更高版本)的。

  2. 执行以下任一操作以在组件中运行自定义代码:

    • 将您的组件构建为包含的 JAR 文件Amazon IoT Device SDK,然后在组件配方中运行此 JAR 文件。

    • 将Amazon IoT Device SDK JAR 定义为组件工件,并在组件配方中运行应用程序时将该工件添加到类路径中。

  3. 创建与核Amazon IoT Greengrass心 IPC 服务的连接。IPC 客户端GreengrassCoreIPCClient,需要EventStreamRPCConnection。下载以下为你提供此连接的IPCUtils类。

    /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.greengrass.util; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection. public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean[] disconnected = {false}; final int[] disconnectedCode = {-1}; connection.connect(new EventStreamRPCConnection.LifecycleHandler() { // Only called on successful connection. @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } // This on error is for any error that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; // True instructs handler to disconnect due to this error. } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
  4. 使用以下代码创建 IPC 客户端。

    try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
要使用 fAmazon IoT Device SDK or Python v2 (IPC 客户端 V1)
  1. 下载Amazon IoT Device SDK适用于 Python 的(v1.5.3 或更高版本)。

  2. 将 SDK 的安装步骤添加到组件配方中的安装生命周期中。

  3. 创建与核Amazon IoT Greengrass心 IPC 服务的连接。完成以下步骤以创建 IPC 客户端并建立连接。

    SDK v1.5.4 or later

    使用以下代码创建 IPC 客户端。

    import awsiot.greengrasscoreipc try: ipc_client = awsiot.greengrasscoreipc.connect() # Use client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
    SDK v1.5.3
    1. 下载以下为您提供 IPC 服务器连接的IPCUtils类。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment TIMEOUT = 10 class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection
    2. 使用以下代码创建 IPC 客户端。

      import awsiot.greengrasscoreipc.client as client try: ipc_utils = IPCUtils() connection = ipc_utils.connect() ipc_client = client.GreengrassCoreIPCClient(connection) # Use client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

要为 C++ 构建Amazon IoT Device SDK v2,设备必须具有以下工具:

  • C++ + + + + + +

  • CMake 3.1 或更高版本

  • 下列编译程序之一:

    • GCC 4.8 或更高版本

    • Clang 3.9 或更高版本

    • MSVC 2015 或更高版本

使用 fAmazon IoT Device SDK or C++ v2
  1. 下载Amazon IoT Device SDK适用于 C++ v2(v 1.17.0 或更高版本)的。

  2. 按照自述文件中的安装说明从源代码构建 fAmazon IoT Device SDK or C++ v2。

  3. 在 C++ 构建工具中,链接您在上一步中构建的 Greengrass IPC 库。AWS::GreengrassIpc-cpp以下CMakeLists.txt示例将 Greengrass IPC 库链接到您使用 CMake 构建的项目。

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. 在您的组件代码中,创建与Amazon IoT Greengrass Core IPC 服务的连接以创建 IPC 客户端 (Aws::Greengrass::GreengrassCoreIpcClient)。您必须定义一个处理 IPC 连接、断开连接和错误事件的 IPC 连接生命周期处理器。以下示例创建了一个 IPC 客户端和一个 IPC 连接生命周期处理器,在 IPC 客户端连接、断开连接和遇到错误时打印该处理程序。

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. 要在组件中运行自定义代码,请将代码构建为二进制工件,然后在组件配方中运行二进制工件。将工件的Execute权限设置为OWNER,以使Amazon IoT Greengrass核心软件能够运行二进制构件。

    您的组件配方Manifests部分可能类似于以下示例。

    JSON
    { ... "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: Run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

授权组件执行 IPC 操作

要允许您的自定义组件使用某些 IPC 操作,您必须定义允许该组件对某些资源执行操作的授权策略。每个授权策略定义了该策略允许的操作列表和资源列表。例如,发布/订阅消息 IPC 服务定义了主题资源的发布和订阅操作。您可以使用*通配符以允许访问所有操作或所有资源。

您可以使用accessControl配置参数定义授权策略,可以在组件配方中设置该配置策略,也可以在部署组件时进行设置。该accessControl对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,该策略 ID 在所有组件中必须是唯一的。

提示

要创建唯一的策略 ID,可以组合组件名称、IPC 服务名称和计数器。例如,名为的组件com.example.HelloWorld可能会使用以下 ID 定义两个发布/订阅授权策略:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。此对象是accessControl配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

授权策略中的通配符

您可以在 IPC 授权策略的resources元素中使用*通配符,允许在单个授权策略中访问多个资源。

  • Greengrass 核心的所有版本中,您可以将单个*字符指定为资源以允许访问所有资源。

  • Greengrass nucleus v2.6.0 及更高版本中,可以在资源中指定*字符以匹配任意字符组合。例如,您可以指定factory/1/devices/Thermostat*/status允许访问工厂中所有恒温器设备的状态主题,每个设备的名称都以此开头Thermostat

在为Amazon IoT Core MQTT IPC 服务定义授权策略时,也可以使用 MQTT 通配符(+#)来匹配多个资源。有关更多信息,请参阅 MQTT IPC 授权策略中的Amazon IoT Core MQTT 通配符

授权策略中的配方变量

如果您使用 Greengrass nucleus v2.6.0 或更高版本,并将 Greengrass nucleus 的interpolateComponentConfiguration配置选项设置为,则可以在授权策略中使用{iot:thingName}配方变量true当您需要包含核心设备名称的授权策略(例如 MQTT 主题或设备影子)时,可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下资源以进行影子 IPC 操作。

$aws/things/{iot:thingName}/shadow/

授权策略中的特殊字符

要在授权策略中指定文字*或字?符,必须使用转义序列。以下转义序列指示 CAmazon IoT Greengrass ore 软件使用文字值而不是字符的特殊含义。例如,该*字符是匹配任意字符组合的配符。

文本 转义序列 注意

*

${*}

?

${?}

Amazon IoT Greengrass目前不支持与任何单个字符匹配的?通配符。

$

${$}

使用此转义序列来匹配包含的资源${。例如,要匹配名为的资源${resourceName},必须指定${$}{resourceName}。否则,要匹配包含的资源$,您可以使用文字$,例如允许访问以开头的主题$aws

授权策略示例

您可以参考以下授权策略示例来帮助您为组件配置授权策略。

例 带有授权策略的示例组件配方

以下示例组件配方包括一个定义授权策略的accessControl对象。此策略授权com.example.HelloWorld组件向该test/topic主题发布消息。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略更新组件配置示例

以下部署中的配置更新示例指定使用定义授权策略的accessControl对象配置组件。此策略授权com.example.HelloWorld组件向该test/topic主题发布消息。

Console
要合并的配置
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
Amazon CLI

以下命令创建了对核心设备的部署。

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

hello-world-deployment.json文件包含以下 JSON 文档。

{ "targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

以下 Greengrass CLI 命令在核心设备上创建本地部署。

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

hello-world-configuration.json文件包含以下 JSON 文档。

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

订阅 IPC 事件直播

您可以使用 IPC 操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每当核心设备向您的组件传输事件消息时,IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请在用于打开订阅的订阅操作对象上调用closeStream()Close() (Java)、(Python) 或 (C++)。close()

CAmazon IoT Greengrass ore IPC 服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请定义处理事件消息、错误和流关闭的回调函数。如果您使用 IPC 客户端 V1,则必须在类中定义这些函数。如果您使用 IPC 客户端 V2(可在更高版本的 Java 和 Python 软件开发工具包中使用),则无需创建订阅处理程序类即可定义这些函数。

Java

如果您使用 IPC 客户端 V1,则必须实现通用接software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>口。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式

void onStreamEvent(StreamEventType event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

发生流错误时 IPC 客户端调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或者返回 false 以保持直播处于打开状态。

void onStreamClosed()

当直播关闭时 IPC 客户端调用的回调。

Python

如果您使用 IPC 客户端 V1,则必须扩展与订阅操作对应的流响应处理程序类。Amazon IoT Device SDK包括每个订阅操作的订阅处理程序类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

如果您使用 IPC 客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式

def on_stream_event(self, event: StreamEventType) -> None

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

发生流错误时 IPC 客户端调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或者返回 false 以保持直播处于打开状态。

def on_stream_closed(self) -> None

当直播关闭时 IPC 客户端调用的回调。

C++

实现一个从与订阅操作对应的流响应处理程序类派生的类。Amazon IoT Device SDK包括每个订阅操作的订阅处理程序基类。StreamEventType是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。

void OnStreamEvent(StreamEventType *event)

IPC 客户端在收到事件消息(例如 MQTT 消息或组件更新通知)时调用的回调。

bool OnStreamError(OperationError *error)

发生流错误时 IPC 客户端调用的回调。

返回 true 以关闭由于错误而导致的订阅流,或者返回 false 以保持直播处于打开状态。

void OnStreamClosed()

当直播关闭时 IPC 客户端调用的回调。

示例订阅处理程序

以下示例演示如何使用SubscribeToTopic操作和订阅处理程序订阅本地发布/订阅消息。

Java (IPC client V2)
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
例 示例:订阅本地发布/订阅消息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
Java (IPC client V1)
例 示例:订阅本地发布/订阅消息
注意

此示例使用IPCUtils类创建与Amazon IoT Greengrass Core IPC 服务的连接。有关更多信息,请参阅ConnectAmazon IoT Greengrass 核心 IPC 服务

package com.aws.greengrass.docs.samples.ipc; import com.aws.greengrass.docs.samples.ipc.util.IPCUtils; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class SubscribeToTopic { public static final int TIMEOUT_SECONDS = 10; public static void main(String[] args) { String topic = args[0]; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new SubscriptionResponseHandler(topic); SubscribeToTopicResponseHandler responseHandler = SubscribeToTopic.subscribeToTopic(ipcClient, topic, streamResponseHandler); CompletableFuture<SubscribeToTopicResponse> futureResponse = responseHandler.getResponse(); try { futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { throw e; } } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (InterruptedException e) { System.out.println("IPC interrupted."); } catch (ExecutionException e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } public static SubscribeToTopicResponseHandler subscribeToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler) { SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); return greengrassCoreIPCClient.subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); } public static class SubscriptionResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { private final String topic; public SubscriptionResponseHandler(String topic) { this.topic = topic; } @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage().getMessage(), StandardCharsets.UTF_8); System.out.printf("Received new message on topic %s: %s%n", this.topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
Python (IPC client V1)
例 示例:订阅本地发布/订阅消息
注意

此示例假设您使用的是 Python v2 的 1.5.4 或更高版本。Amazon IoT Device SDK如果您使用的是 SDK 的 1.5.3 版,有关连接到Amazon IoT Greengrass Core IPC 服务的信息,请参阅用Amazon IoT Device SDK于 Python v2(IPC 客户端 V1)

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() future_response.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()
C++
例 示例:订阅本地发布/订阅消息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }

IPC 最佳实践

在自定义组件中使用 IPC 的最佳做法在 IPC 客户端 V1 和 IPC 客户端 V2 之间有所不同。遵循您使用的 IPC 客户端版本的最佳实践。

IPC client V2

IPC 客户端 V2 在单独的线程中运行回调函数,因此与 IPC 客户端 V1 相比,使用 IPC 和编写订阅处理函数时需要遵循的准则较少。

  • 重复使用一个 IPC 客户端

    创建 IPC 客户端后,将其保持打开状态,并在所有 IPC 操作中重复使用。创建多个客户端会占用额外资源,并可能导致资源泄漏。

  • 处理异常

    IPC 客户端 V2 在订阅处理函数中记录未捕获的异常。您应该在处理函数中catch 异常,以处理代码中出现的错误。

IPC client V1

IPC 客户端 V1 使用与 IPC 服务器通信并调用订阅处理程序的单线程。在编写订阅处理函数时,必须考虑这种同步行为。

  • 重复使用一个 IPC 客户端

    创建 IPC 客户端后,将其保持打开状态,并在所有 IPC 操作中重复使用。创建多个客户端会占用额外资源,并可能导致资源泄漏。

  • 异步运行阻塞代码

    当线程被阻塞时,IPC 客户端 V1 无法发送新请求或处理新的事件消息。您应该在从处理程序函数运行的单独线程中运行阻塞代码。阻塞代码包括sleep调用、持续运行的循环以及需要时间才能完成的同步 I/O 请求。

  • 异步发送新的 IPC 请求

    IPC 客户端 V1 无法从订阅处理程序函数中发送新请求,因为如果您等待响应,该请求会阻塞处理函数。您应该在从处理程序函数运行的单独线程中发送 IPC 请求。

  • 处理异常

    IPC 客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅将关闭,并且该异常不会出现在您的组件日志中。您应该在处理函数中catch 异常,以保持订阅处于打开状态,并记录代码中出现的错误。