使用 AWS AWS IoT 设备开发工具包进行进程间通信 (IPC) - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS AWS IoT 设备开发工具包进行进程间通信 (IPC)

在核心设备上运行的组件可以使用 AWS AWS IoT IoT 设备软件开发工具包中的 AWS IoT IoT Greengrass 核心进程间通信 (IPC) 库与其他 AWS AWS IoT Greengrass 组件和进程进行通信。要开发和运行使用 IPC 的自定义组件,您必须使用 AWS IoT IoT 设备开发工具包连接到 AWS AWS IoT Greengrass 核心 IPC 服务并执行 IPC 操作。

IPC 接口支持两种操作:

  • 请求/响应

    组件将请求发送到 IPC 服务并接收包含请求结果的响应。

  • 订阅

    组件将订阅请求发送到 IPC 服务,并期望事件消息流进行响应。组件提供处理事件消息、错误和流关闭的订阅处理程序。AWS AWS IoT 设备开发工具包包含一个处理程序接口,其中包含每个 IPC 操作的正确响应和事件类型。有关更多信息,请参阅订阅 IPC 事件流

支持的用于进程间通信的开发SDKs

AWS AWS IoT Greengrass 核心 IPC 库包含在以下 AWS IoT IoT 设备开发工具包版本中。

连接到 AWS AWS IoT Greengrass 核心 IPC 服务

要在自定义组件中使用进程间通信,您必须创建到 AWS AWS IoT Greengrass Core 软件运行的 IPC 服务器套接字的连接。完成以下任务,以所选语言下载并使用 AWS IoT IoT 设备开发工具包。

使用适用于 Java 的 AWS AWS IoT 设备开发工具包 v2

  1. 下载适用于 JavaAWS AWS IoT 设备开发工具包 (v1 或更高版本)。

  2. 执行以下操作之一以在组件中运行您的自定义代码:

    • 将您的组件构建为包含 AWS AWS IoT 设备开发工具包的 JAR 文件,然后在组件配方中运行此 JAR 文件。

    • 将 AWS AWS IoT 设备开发工具包 JAR 定义为组件构件,并在组件配方中运行应用程序时将该构件添加到类路径。

  3. 创建与 AWS AWS IoT Greengrass 核心 IPC 服务的连接。IPC 客户端 GreengrassCoreIPCClient需要 EventStreamRPCConnection。下载为您提供此连接的以下IPCUtils类。

    /* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnectionConfig; import software.amazon.awssdk.eventstreamrpc.GreengrassConnectMessageSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public final class IPCUtils { // Port number is not used in domain sockets. // It is ignored but the field needs to be set when creating socket connection public static final int DEFAULT_PORT_NUMBER = 8033; private static EventStreamRPCConnection clientConnection = null; private IPCUtils() { } public static EventStreamRPCConnection getEventStreamRpcConnection() throws ExecutionException, InterruptedException { String ipcServerSocketPath = System.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT"); String authToken = System.getenv("SVCUID"); SocketOptions socketOptions = IPCUtils.getSocketOptionsForIPC(); if (clientConnection == null) { clientConnection = connectToGGCOverEventStreamIPC(socketOptions, authToken, ipcServerSocketPath); } return clientConnection; } // removed dependency on kernel, as it is only being used to pull ipcServerSocketPath private static EventStreamRPCConnection connectToGGCOverEventStreamIPC(SocketOptions socketOptions, String authToken, String ipcServerSocketPath) throws ExecutionException, InterruptedException { try (EventLoopGroup elGroup = new EventLoopGroup(1); ClientBootstrap clientBootstrap = new ClientBootstrap(elGroup, null)) { final EventStreamRPCConnectionConfig config = new EventStreamRPCConnectionConfig(clientBootstrap, elGroup, socketOptions, null, ipcServerSocketPath, DEFAULT_PORT_NUMBER, GreengrassConnectMessageSupplier.connectMessageSupplier(authToken)); final CompletableFuture<Void> connected = new CompletableFuture<>(); final EventStreamRPCConnection connection = new EventStreamRPCConnection(config); final boolean disconnected[] = {false}; final int disconnectedCode[] = {-1}; //this is a bit cumbersome but does not prevent a convenience wrapper from exposing a sync //connect() or a connect() that returns a CompletableFuture that errors //this could be wrapped by utility methods to provide a more connection.connect(new EventStreamRPCConnection.LifecycleHandler() { //only called on successful connection. // That is full on Connect -> ConnectAck(ConnectionAccepted=true) @Override public void onConnect() { connected.complete(null); } @Override public void onDisconnect(int errorCode) { disconnected[0] = true; disconnectedCode[0] = errorCode; clientConnection = null; } //This on error is for any errors that is connection level, including problems during connect() @Override public boolean onError(Throwable t) { connected.completeExceptionally(t); clientConnection = null; return true; //hints at handler to disconnect due to this error } }); connected.get(); return connection; } } private static SocketOptions getSocketOptionsForIPC() { SocketOptions socketOptions = new SocketOptions(); socketOptions.connectTimeoutMs = 3000; socketOptions.domain = SocketOptions.SocketDomain.LOCAL; socketOptions.type = SocketOptions.SocketType.STREAM; return socketOptions; } }
  4. 使用以下代码创建 IPC 客户端。

    try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }

使用适用于 Python 的 AWS IoT IoT 设备开发工具包 v2

  1. 下载适用于 PythonAWS IoT IoT 设备开发工具包v1 或更高版本)。

  2. 将开发工具包的安装步骤添加到您的组件配方中的安装生命周期中。

  3. 创建与 AWS AWS IoT Greengrass 核心 IPC 服务的连接。完成以下步骤以创建 IPC 客户端并建立连接。

    SDK v1.5.4 or later

    使用以下代码创建 IPC 客户端。

    import awsiot.greengrasscoreipc ipc_client = awsiot.greengrasscoreipc.connect()
    SDK v1.5.3
    1. 下载为您提供 IPC 服务器连接的以下IPCUtils类。

      # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import os from awscrt.io import ( ClientBootstrap, DefaultHostResolver, EventLoopGroup, SocketDomain, SocketOptions, ) from awsiot.eventstreamrpc import Connection, LifecycleHandler, MessageAmendment TIMEOUT = 10 class IPCUtils: def connect(self): elg = EventLoopGroup() resolver = DefaultHostResolver(elg) bootstrap = ClientBootstrap(elg, resolver) socket_options = SocketOptions() socket_options.domain = SocketDomain.Local amender = MessageAmendment.create_static_authtoken_amender(os.getenv("SVCUID")) hostname = os.getenv("AWS_GG_NUCLEUS_DOMAIN_SOCKET_FILEPATH_FOR_COMPONENT") connection = Connection( host_name=hostname, port=8033, bootstrap=bootstrap, socket_options=socket_options, connect_message_amender=amender, ) self.lifecycle_handler = LifecycleHandler() connect_future = connection.connect(self.lifecycle_handler) connect_future.result(TIMEOUT) return connection
    2. 使用以下代码创建 IPC 客户端。

      import awsiot.greengrasscoreipc.client as client ipc_utils = IPCUtils() connection = ipc_utils.connect() ipc_client = client.GreengrassCoreIPCClient(connection)

授权组件执行 IPC 操作

要允许您的自定义组件使用某些 IPC 操作,您必须定义授权策略,以允许组件对特定资源执行该操作。每个授权策略都定义了策略允许的操作列表和资源列表。例如,发布/订阅消息收发 IPC 服务为 主题资源定义发布和订阅操作。您可以使用 * 通配符来允许访问所有 操作或所有资源。

您可以使用 accessControl 配置参数在组件配方中定义授权策略。accessControl 对象将 IPC 服务标识符映射到授权策略列表。您可以为每个 IPC 服务定义多个授权策略来控制访问。每个授权策略都有一个策略 ID,它在所有组件中必须是唯一的。

提示

要创建唯一策略 IDs您可以组合组件名称、IPC 服务名称和计数器。例如,名为 的组件com.example.HelloWorld可能使用以下 IDs 定义两个发布/订阅授权策略:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

授权策略使用以下格式。此对象是 accessControl 配置参数。

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

例 带授权策略的示例组件配方

以下示例组件配方包含一个 accessControl 对象以定义授权策略。此策略授权 com.example.HelloWorld 组件发布到 test/topic 主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -Dlog.level=INFO -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: Run: |- java -Dlog.level=INFO -jar {artifacts:path}/HelloWorld.jar

订阅 IPC 事件流

您可以使用 IPC 操作订阅 Greengrass 核心设备上的事件流。要使用订阅操作,请定义订阅处理程序并创建对 IPC 服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC 客户端都会运行订阅处理程序的函数。

您可以关闭订阅以停止处理事件消息。为此,请在用于打开订阅的订阅操作对象上调用 closeStream() (Java) 或 close() (Python)。

AWS AWS IoT Greengrass 核心 IPC 服务支持以下订阅操作:

定义订阅处理程序

要定义订阅处理程序,请创建一个类,该类具有处理事件消息、错误和流结束的回调函数。

Java

实施通用software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType>接口。StreamEventType 是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流结束。

void onStreamEvent(StreamEventType event)

IPC 客户端在收到事件消息(如 MQTT 消息或组件更新通知)时调用的回调。

boolean onStreamError(Throwable error)

发生流错误时 IPC 客户端调用的回调。

返回 true 以由于错误而关闭订阅流,或返回 false 以保持流打开。

void onStreamClosed()

流关闭时 IPC 客户端调用的回调。

Python

扩展与订阅操作对应的流响应处理程序类。AWS AWS IoT 设备开发工具包为每个订阅操作包含一个订阅处理程序类。StreamEventType 是订阅操作的事件消息的类型。定义以下函数以处理事件消息、错误和流结束。

def on_stream_event(self, event: StreamEventType) -> None

IPC 客户端在收到事件消息(如 MQTT 消息或组件更新通知)时调用的回调。

def on_stream_error(self, error: Exception) -> bool

发生流错误时 IPC 客户端调用的回调。

返回 True 以由于错误而关闭订阅流,或返回 False 以保持流打开。

def on_stream_closed(self) -> None

流关闭时 IPC 客户端调用的回调。

订阅处理程序的最佳实践

IPC 客户端使用单个线程,该线程与 IPC 服务器通信并调用您的订阅处理程序。在编写订阅处理程序函数时,您必须考虑此同步行为。在编写订阅处理程序函数时,请遵循以下准则。

  • 异步运行阻止代码

    在阻止线程时,IPC 客户端无法发送新请求或处理新事件消息。您可以在从处理程序函数运行的独立线程中运行阻止代码。阻止代码包括sleep调用、持续运行的循环以及需要时间才能完成的同步 I/O 请求。

  • 异步发送新 IPC 请求

    IPC 客户端无法从订阅处理程序函数中发送新请求,因为如果您等待响应,则请求会阻止处理程序函数。您可以在从处理程序函数运行的独立线程中发送 IPC 请求。

  • 处理异常

    IPC 客户端不处理订阅处理程序函数中未捕获的异常。如果您的处理程序函数引发异常,则订阅将关闭,并且该异常不会显示在您的组件日志中。您可以在处理程序函数中捕获异常,以保持订阅打开和记录代码中发生的错误。

示例订阅处理程序

以下示例演示如何使用 SubscribeToTopic 操作和订阅处理程序来订阅本地发布/订阅消息。

Java

例 示例:订阅本地发布/订阅消息

String topic = "my/topic"; SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new StreamResponseHandler<SubscriptionResponseMessage>() { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); // Handle message. } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { // Handle error. return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { // Handle close. } }; SubscribeToTopicResponseHandler operationResponseHandler = greengrassCoreIPCClient .subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); operationResponseHandler.getResponse().get(); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. operationResponseHandler.closeStream();
Python

例 示例:订阅本地发布/订阅消息

注意

此示例假定您使用的是适用于 Python v2 的 AWS IoT IoT 设备开发工具包版本 1.5.4 或更高版本。如果您使用的是开发工具包的版本 1.5.3,请参阅使用适用于 Python 的 AWS IoT 设备开发工具包 v2,以了解有关连接到 AWS AWS IoT Greengrass 核心 IPC 服务的信息。

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()