Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅
中国的 Amazon Web Services 服务入门
(PDF)。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Amazon IoT Device SDK 与 Greengrass 原子核、其他组件进行通信 Amazon IoT Core
在核心设备上运行的组件可以使用中的 Amazon IoT Greengrass 核心进程间通信 (IPC) 库与 Amazon IoT Greengrass 核心和 Amazon IoT Device SDK 其他 Greengrass 组件进行通信。要开发和运行使用的自定义组件IPC,必须使用连接 Amazon IoT Device SDK 到 C Amazon IoT Greengrass ore IPC 服务并执行IPC操作。
该IPC接口支持两种类型的操作:
IPC客户端版本
在更高版本的 Java 和 Python 中SDKs, Amazon IoT Greengrass 提供了IPC客户端的改进版本,称为IPC客户端 V2。IPC客户端 V2:
-
减少使用IPC操作所需编写的代码量,并有助于避免IPC客户端 V1 可能出现的常见错误。
-
在单独的线程中调用订阅处理程序回调,因此您现在可以在订阅处理程序回调中运行阻塞代码,包括其他IPC函数调用。IPC客户端 V1 使用同一个线程与IPC服务器通信并调用订阅处理程序回调。
-
允许您使用 Lambda 表达式 (Java) 或函数 (Python) 调用订阅操作。IPC客户端 V1 要求您定义订阅处理程序类。
-
提供每个IPC操作的同步和异步版本。IPC客户端 V1 仅提供每个操作的异步版本。
我们建议您使用IPC客户端 V2 来利用这些改进。但是,本文档和一些在线内容中的许多示例仅演示如何使用IPC客户端 V1。您可以使用以下示例和教程来查看使用IPC客户端 V2 的示例组件:
目前, Amazon IoT Device SDK 适用于 C++ 的 v2 仅支持IPC客户端 V1。
SDKs支持进程间通信
Amazon IoT Greengrass 核心IPC库包含在以下 Amazon IoT Device SDK 版本中。
Connect 到 Amazon IoT Greengrass Core IPC 服务
要在自定义组件中使用进程间通信,必须创建与 C Amazon IoT Greengrass ore 软件运行的IPC服务器套接字的连接。完成以下任务,下载并使用您选择 Amazon IoT Device SDK 的语言。
要使用 Amazon IoT Device SDK 适用于 Java v2(IPC客户端 V2)
-
下载Amazon IoT Device SDK 适用于 Java 版本 2(1.6.0 或更高版本)的。
-
执行以下任一操作以在组件中运行您的自定义代码:
-
使用以下代码创建IPC客户端。
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
// Use client.
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e);
System.exit(1);
}
要使用 Amazon IoT Device SDK 适用于 Python v2(IPC客户端 V2)
-
下载Amazon IoT Device SDK 适用于 Python 的(1.9.0 或更高版本)。
-
将SDK的安装步骤添加到组件配方的安装生命周期中。
-
创建与 Amazon IoT Greengrass 核心IPC服务的连接。使用以下代码创建IPC客户端。
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
try:
ipc_client = GreengrassCoreIPCClientV2()
# Use IPC client.
except Exception:
print('Exception occurred when using IPC.', file=sys.stderr)
traceback.print_exc()
exit(1)
要构建 C++ 版 Amazon IoT Device SDK v2,设备必须具有以下工具:
-
C++ 11 或更高版本
-
CMake3.1 或更高版本
-
以下编译器之一:
-
GCC4.8 或更高版本
-
Clang 3.9 或更高版本
-
MSVC2015 年或以后
要使用 Amazon IoT Device SDK 适用于 C++ v2 的
-
下载Amazon IoT Device SDK 适用于 C++ v2(1.17.0 或更高版本)的。
-
按照中的安装说明从源代码构建 C++ v2 的。README Amazon IoT Device SDK
-
在你的 C++ 构建工具中,链接你在上一步中构建的 IPC Greengrass 库AWS::GreengrassIpc-cpp
。以下CMakeLists.txt
示例将 G IPC reengrass 库链接到您用来构建的项目。CMake
cmake_minimum_required(VERSION 3.1)
project (greengrassv2_pubsub_subscriber)
file(GLOB MAIN_SRC
"*.h"
"*.cpp"
)
add_executable(${PROJECT_NAME} ${MAIN_SRC})
set_target_properties(${PROJECT_NAME} PROPERTIES
LINKER_LANGUAGE CXX
CXX_STANDARD 11)
find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build)
find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build)
target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
-
在组件代码中,创建与 Amazon IoT Greengrass 核心IPC服务的连接以创建IPC客户端 (Aws::Greengrass::GreengrassCoreIpcClient
)。您必须定义一个用于处理IPC连接、断开IPC连接和错误事件的连接生命周期处理程序。以下示例创建了一个IPC客户端和一个IPC连接生命周期处理程序,用于在IPC客户端连接、断开连接和遇到错误时进行打印。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
// Create the IPC client.
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
// Use the IPC client to create an operation request.
// Activate the operation request.
auto activate = operation.Activate(request, nullptr);
activate.wait();
// Wait for Greengrass Core to respond to the request.
auto responseFuture = operation.GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
// Check the result of the request.
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
return 0;
}
-
要在组件中运行自定义代码,请将代码构建为二进制构件,然后在组件配方中运行二进制构件。将构件的Execute
权限设置为OWNER
,以使 Amazon IoT Greengrass 核心软件能够运行二进制工件。
您的组件配方Manifests
部分可能与以下示例类似。
- JSON
-
{
...
"Manifests": [
{
"Lifecycle": {
"run": "{artifacts:path}/greengrassv2_pubsub_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
...
Manifests:
- Lifecycle:
run: {artifacts:path}/greengrassv2_pubsub_subscriber
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber
Permission:
Execute: OWNER
要编译 Amazon IoT Device SDK 适用于 JavaScript v2 的,以便与 NodeJS 一起使用,设备必须具有以下工具:
-
NodeJS 10.0 或更高版本
-
CMake3.1 或更高版本
要使用 Amazon IoT Device SDK 适用于 JavaScript v2(IPC客户端 V1)
-
下载Amazon IoT Device SDK
适用于 JavaScript v2(v 1.12.10 或更高版本)的。
-
按照中的安装说明从源代码构建 JavaScript v2 版。README Amazon IoT Device SDK
-
创建与 Amazon IoT Greengrass 核心IPC服务的连接。完成以下步骤以创建IPC客户端并建立连接。
-
使用以下代码创建IPC客户端。
import * as greengrascoreipc from 'aws-iot-device-sdk-v2';
let client = greengrascoreipc.createClient();
-
使用以下代码建立从您的组件到 Greengrass 核的连接。
await client.connect();
授权组件执行IPC操作
要允许您的自定义组件使用某些IPC操作,您必须定义允许该组件对某些资源执行操作的授权策略。每个授权策略都定义了该策略允许的操作列表和资源列表。例如,发布/订阅消息IPC服务定义了主题资源的发布和订阅操作。您可以使用*
通配符来允许访问所有操作或所有资源。
您可以使用accessControl
配置参数定义授权策略,可以在组件配方中或部署组件时设置该参数。该accessControl
对象将IPC服务标识符映射到授权策略列表。您可以为每项IPC服务定义多个授权策略来控制访问权限。每个授权策略都有一个策略 ID,它在所有组件中必须是唯一的。
要创建唯一的策略IDs,您可以组合组件名称、IPC服务名称和计数器。例如,名为的组件com.example.HelloWorld
可以定义两个发布/订阅授权策略,如下所示:IDs
授权策略使用以下格式。此对象是accessControl
配置参数。
- JSON
-
{
"IPC service identifier
": {
"policyId
": {
"policyDescription": "description
",
"operations": [
"operation1
",
"operation2
"
],
"resources": [
"resource1
",
"resource2
"
]
}
}
}
- YAML
-
IPC service identifier
:
policyId
:
policyDescription: description
operations:
- operation1
- operation2
resources:
- resource1
- resource2
授权策略中的通配符
您可以在IPC授权策略resources
元素中使用*
通配符来允许访问单个授权策略中的多个资源。
-
在所有版本的 Greengrass nucleus 中,您可以指定*
单个角色作为资源以允许访问所有资源。
-
在 Greengrass nucleus v2.6.0 及更高版本中,您可以在资源中指定字符以匹配任意字符*
组合。例如,您可以指定factory/1/devices/Thermostat*/status
允许访问工厂中所有恒温器设备的状态主题,其中每台设备的名称都以 Thermostat
“开头”。
在为 Amazon IoT Core MQTTIPC服务定义授权策略时,也可以使用MQTT通配符(+
和#
)来匹配多个资源。有关更多信息,请参阅Amazon IoT Core MQTTIPC授权策略中的MQTT通配符。
授权策略中的配方变量
如果你使用 Greengrass nucleus v2.6.0 或更高版本,并且将 Greeng interpolateComponentConfigurationrass nucleus 的配置选项设置为,则可以在授权策略中使用配方变量。true{iot:thingName}当您需要包含核心设备名称的授权策略(例如MQTT主题或设备影子)时,可以使用此配方变量为一组核心设备配置单个授权策略。例如,您可以允许组件访问以下资源以进行影子IPC操作。
$aws/things/{iot:thingName}/shadow/
授权策略中的特殊字符
要在授权策略中指定文字*
或?
字符,必须使用转义序列。以下转义序列指示 Amazon IoT Greengrass Core 软件使用字面值而不是字符的特殊含义。例如,该*
字符是与任意字符组合匹配的通配符。
字面字符 |
转义序列 |
注意 |
*
|
${*}
|
|
?
|
${?}
|
Amazon IoT Greengrass 目前不支持与任何单个字符匹配的? 通配符。
|
$
|
${$}
|
使用此转义序列来匹配包含的资源${ 。例如,要匹配名为的资源${resourceName} ,必须指定${$}{resourceName} 。否则,要匹配包含的资源$ ,您可以使用文字$ ,例如允许访问以开头$aws 的主题。
|
授权策略示例
您可以参考以下授权策略示例,以帮助您为组件配置授权策略。
例 带有授权策略的组件配方示例
以下示例组件配方包括一个定义授权策略的accessControl
对象。此策略授权该com.example.HelloWorld
组件发布到该test/topic
主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.HelloWorld",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes messages.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"run": "java -jar {artifacts:path}/HelloWorld.jar"
}
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.HelloWorld
ComponentVersion: '1.0.0'
ComponentDescription: A component that publishes messages.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.pubsub:
"com.example.HelloWorld:pubsub:1":
policyDescription: Allows access to publish to test/topic.
operations:
- "aws.greengrass#PublishToTopic"
resources:
- "test/topic"
Manifests:
- Lifecycle:
run: |-
java -jar {artifacts:path}/HelloWorld.jar
例 使用授权策略更新组件配置示例
以下部署中的示例配置更新指定使用定义授权策略的accessControl
对象来配置组件。此策略授权该com.example.HelloWorld
组件发布到该test/topic
主题。
- Console
-
- 要合并的配置
-
{
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
- Amazon CLI
-
以下命令创建对核心设备的部署。
aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json
该hello-world-deployment.json
文件包含以下JSON文档。
{
"targetArn": "arn:aws:iot:us-west-2:123456789012:thing/MyGreengrassCore",
"deploymentName": "Deployment for MyGreengrassCore",
"components": {
"com.example.HelloWorld": {
"componentVersion": "1.0.0",
"configurationUpdate": {
"merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}"
}
}
}
}
- Greengrass CLI
-
以下 G CLI reengrass 命令在核心设备上创建本地部署。
sudo greengrass-cli deployment create \
--recipeDir recipes \
--artifactDir artifacts \
--merge "com.example.HelloWorld=1.0.0" \
--update-config hello-world-configuration.json
该hello-world-configuration.json
文件包含以下JSON文档。
{
"com.example.HelloWorld": {
"MERGE": {
"accessControl": {
"aws.greengrass.ipc.pubsub": {
"com.example.HelloWorld:pubsub:1": {
"policyDescription": "Allows access to publish to test/topic.",
"operations": [
"aws.greengrass#PublishToTopic"
],
"resources": [
"test/topic"
]
}
}
}
}
}
}
订阅IPC活动直播
您可以使用IPC操作在 Greengrass 核心设备上订阅事件流。要使用订阅操作,请定义订阅处理程序并创建对IPC服务的请求。然后,每次核心设备将事件消息流式传输到您的组件时,IPC客户端都会运行订阅处理程序的函数。
您可以关闭订阅以停止处理事件消息。为此,请在用于打开订阅的订阅操作对象上调用 closeStream()
Close()
(Java)、close()
(Python) 或 (C++)。
Amazon IoT Greengrass 核心IPC服务支持以下订阅操作:
定义订阅处理程序
要定义订阅处理程序,请定义处理事件消息、错误和流关闭的回调函数。如果使用IPC客户端 V1,则必须在类中定义这些函数。如果您使用IPC客户端 V2(在 Java 和 Python 的更高版本中可用)SDKs,则无需创建订阅处理程序类即可定义这些函数。
- Java
-
如果使用IPC客户端 V1,则必须实现通用接software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType
>
口。StreamEventType
是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。
如果您使用IPC客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式。
void onStreamEvent(StreamEventType
event)
-
IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。
boolean onStreamError(Throwable error)
-
发生直播错误时IPC客户端调用的回调。
如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。
void onStreamClosed()
-
直播关闭时IPC客户端调用的回调。
- Python
-
如果您使用IPC客户端 V1,则必须扩展与订阅操作相对应的流响应处理程序类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序类。StreamEventType
是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。
如果您使用IPC客户端 V2,则可以在订阅处理程序类之外定义这些函数或使用 lambda 表达式。
def on_stream_event(self, event:
StreamEventType
) -> None
-
IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。
def on_stream_error(self, error: Exception) -> bool
-
发生直播错误时IPC客户端调用的回调。
如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。
def on_stream_closed(self) -> None
-
直播关闭时IPC客户端调用的回调。
- C++
-
实现一个从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType
是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。
void OnStreamEvent(StreamEventType
*event)
-
IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。
bool OnStreamError(OperationError *error)
-
发生直播错误时IPC客户端调用的回调。
如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。
void OnStreamClosed()
-
直播关闭时IPC客户端调用的回调。
- JavaScript
-
实现一个从与订阅操作对应的流响应处理程序类派生的类。 Amazon IoT Device SDK 包括每个订阅操作的订阅处理程序基类。StreamEventType
是订阅操作的事件消息的类型。定义以下函数来处理事件消息、错误和流关闭。
on(event: 'ended', listener: StreamingOperationEndedListener)
-
直播关闭时IPC客户端调用的回调。
on(event: 'streamError', listener: StreamingRpcErrorListener)
-
发生直播错误时IPC客户端调用的回调。
如果出现错误,则返回 true 则关闭订阅流,或返回 false 以保持直播的打开状态。
on(event: 'message', listener: (message: InboundMessageType) => void)
-
IPC客户端在收到事件消息(例如MQTT消息或组件更新通知)时调用的回调。
订阅处理程序示例
以下示例演示如何使用SubscribeToTopic操作和订阅处理程序来订阅本地发布/订阅消息。
- Java (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
public class SubscribeToTopicV2 {
public static void main(String[] args) {
String topic = args[0];
try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse,
SubscribeToTopicResponseHandler> response =
ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent,
Optional.of(SubscribeToTopicV2::onStreamError),
Optional.of(SubscribeToTopicV2::onStreamClosed));
SubscribeToTopicResponseHandler responseHandler = response.getHandler();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (Exception e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
System.err.println("Exception occurred when using IPC.");
}
e.printStackTrace();
System.exit(1);
}
}
public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) {
try {
BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage();
String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8);
String topic = binaryMessage.getContext().getTopic();
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
public static boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false; // Return true to close stream, false to keep stream open.
}
public static void onStreamClosed() {
System.out.println("Subscribe to topic stream closed.");
}
}
- Python (IPC client V2)
-
例 示例:订阅本地发布/订阅消息
import sys
import time
import traceback
from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
SubscriptionResponseMessage,
UnauthorizedError
)
def main():
args = sys.argv[1:]
topic = args[0]
try:
ipc_client = GreengrassCoreIPCClientV2()
# Subscription operations return a tuple with the response and the operation.
_, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
print('Successfully subscribed to topic: ' + topic)
# Keep the main thread alive, or the process will exit.
try:
while True:
time.sleep(10)
except InterruptedError:
print('Subscribe interrupted.')
# To stop subscribing, close the stream.
operation.close()
except UnauthorizedError:
print('Unauthorized error while subscribing to topic: ' +
topic, file=sys.stderr)
traceback.print_exc()
exit(1)
except Exception:
print('Exception occurred', file=sys.stderr)
traceback.print_exc()
exit(1)
def on_stream_event(event: SubscriptionResponseMessage) -> None:
try:
message = str(event.binary_message.message, 'utf-8')
topic = event.binary_message.context.topic
print('Received new message on topic %s: %s' % (topic, message))
except:
traceback.print_exc()
def on_stream_error(error: Exception) -> bool:
print('Received a stream error.', file=sys.stderr)
traceback.print_exc()
return False # Return True to close stream, False to keep stream open.
def on_stream_closed() -> None:
print('Subscribe to topic stream closed.')
if __name__ == '__main__':
main()
- C++
-
例 示例:订阅本地发布/订阅消息
#include <iostream>
#include </crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class SubscribeResponseHandler : public SubscribeToTopicStreamHandler {
public:
virtual ~SubscribeResponseHandler() {}
private:
void OnStreamEvent(SubscriptionResponseMessage *response) override {
auto jsonMessage = response->GetJsonMessage();
if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) {
auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable();
// Handle JSON message.
} else {
auto binaryMessage = response->GetBinaryMessage();
if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) {
auto messageBytes = binaryMessage.value().GetMessage().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
// Handle binary message.
}
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
int timeout = 10;
SubscribeToTopicRequest request;
request.SetTopic(topic);
//SubscribeResponseHandler streamHandler;
auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToTopic(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToTopic {
private ipcClient : greengrasscoreipc.Client
private readonly topic : string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToTopic().then(r => console.log("Started workflow"));
}
private async subscribeToTopic() {
try {
this.ipcClient = await getIpcClient();
const subscribeToTopicRequest : SubscribeToTopicRequest = {
topic: this.topic,
}
const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options
streamingOperation.on("message", (message: SubscriptionResponseMessage) => {
// parse the message depending on your use cases, e.g.
if(message.binaryMessage && message.binaryMessage.message) {
const receivedMessage = message.binaryMessage?.message.toString();
}
});
streamingOperation.on("streamError", (error : RpcError) => {
// define your own error handling logic
})
streamingOperation.on("ended", () => {
// define your own logic
})
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToTopic = new SubscribeToTopic();
IPC最佳实践
IPC在自定义组件中使用的最佳实践在IPC客户端 V1 和IPC客户端 V2 之间有所不同。请遵循所用IPC客户端版本的最佳实践。
- IPC client V2
-
IPC客户端 V2 在单独的线程中运行回调函数,因此与IPC客户端 V1 相比,在使用IPC和编写订阅处理函数时,需要遵循的准则较少。
- IPC client V1
-
IPC客户端 V1 使用与IPC服务器通信并调用订阅处理程序的单线程。在编写订阅处理函数时,必须考虑这种同步行为。
-
重复使用一个IPC客户端
创建IPC客户端后,请将其保持打开状态并在所有IPC操作中重复使用。创建多个客户端会消耗额外的资源,并可能导致资源泄漏。
-
异步运行阻塞代码
当线程被阻塞时,IPC客户端 V1 无法发送新请求或处理新的事件消息。你应该在从处理函数运行的单独线程中运行阻塞代码。阻塞代码包括sleep
调用、持续运行的循环以及需要一段时间才能完成的同步 I/O 请求。
-
异步发送新IPC请求
IPC客户端 V1 无法从订阅处理函数中发送新请求,因为如果您等待响应,该请求会阻塞处理器函数。您应该在从处理函数运行的单独线程中发送IPC请求。
-
处理异常
IPC客户端 V1 不处理订阅处理函数中未捕获的异常。如果您的处理函数抛出异常,则订阅将关闭,并且该异常不会出现在您的组件日志中。您应该在处理程序函数中捕获异常,以保持订阅处于打开状态,并记录代码中发生的错误。