发布/订阅本地消息 - Amazon IoT Greengrass
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

发布/订阅本地消息

发布/订阅 (pubsub) 消息收发允许您向主题发送和接收消息。组件可以向主题发布消息,以将消息发送给其他组件。然后,订阅该主题的组件可以对它们收到的消息进行操作。

注意

您不能使用此发布/订阅 IPC 服务来发布或订阅 Amazon IoT Core MQTT。有关如何使用 Amazon IoT Core MQTT 交换消息的更多信息,请参阅 发布/订阅 Amazon IoT Core MQTT 消息

最低 SDK 版本

下表列出了在向本地主题发布和订阅消息时必须使用的最低 Amazon IoT Device SDK 版本。

授权

要在自定义组件中使用本地发布/订阅消息收发,必须定义允许您的组件向主题发送和接收消息的授权策略。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作

发布/订阅消息收发的授权策略具有以下属性。

IPC 服务标识符:aws.greengrass.ipc.pubsub

操作 描述 资源

aws.greengrass#PublishToTopic

允许组件向您指定的主题发布消息。

主题字符串,例如 test/topic* 使用匹配主题中的任意字符组合。

此主题字符串不支持 MQTT 主题通配符(#+)。

aws.greengrass#SubscribeToTopic

允许组件订阅您指定主题的消息。

主题字符串,例如 test/topic* 使用匹配主题中的任意字符组合。

Greengrass Nucleus v2.6.0 及更高版本中,您可以订阅包含 MQTT 主题通配符(#+)的主题。此主题字符串支持 MQTT 主题通配符作为文字字符。例如,如果组件的授权策略授予访问 test/topic/# 的权限,则该组件可以订阅 test/topic/#,但无法订阅 test/topic/filter

*

允许组件为您指定的主题发布和订阅消息。

主题字符串,例如 test/topic* 使用匹配主题中的任意字符组合。

Greengrass Nucleus v2.6.0 及更高版本中,您可以订阅包含 MQTT 主题通配符(#+)的主题。此主题字符串支持 MQTT 主题通配符作为文字字符。例如,如果组件的授权策略授予访问 test/topic/# 的权限,则该组件可以订阅 test/topic/#,但无法订阅 test/topic/filter

授权策略示例

您可以参考以下授权策略示例,帮助您为组件配置授权策略。

例 示例授权策略

以下示例授权策略允许组件发布和订阅所有主题。

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

PublishToTopic

向主题发布消息。

请求

此操作的请求包含以下参数:

topic

要向其发布消息的主题。

publishMessage (Python: publish_message)

待发布的消息。该对象 PublishMessage 包含以下信息。您必须指定 jsonMessagebinaryMessage 中的一个。

jsonMessage (Python: json_message)

(可选)一条 JSON 消息。该对象 JsonMessage 包含以下信息:

message

作为对象的 JSON 消息。

context

消息的上下文,例如消息发布的主题。

此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 Amazon IoT Device SDK 版本。

注意

Amazon IoT Greengrass Core 软件在 PublishToTopicSubscribeToTopic 操作中使用相同的消息对象。订阅后,Amazon IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。

该对象 MessageContext 包含以下信息:

topic

消息发布的主题。

binaryMessage (Python: binary_message)

(可选)二进制消息。该对象 BinaryMessage 包含以下信息:

message

以 blob 形式呈现的二进制消息。

context

消息的上下文,例如消息发布的主题。

此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 Amazon IoT Device SDK 版本。

注意

Amazon IoT Greengrass Core 软件在 PublishToTopicSubscribeToTopic 操作中使用相同的消息对象。订阅后,Amazon IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。

该对象 MessageContext 包含以下信息:

topic

消息发布的主题。

响应

此操作在其响应中未提供任何信息。

示例

以下示例演示了如何在自定义组件代码中调用该操作。

Java (IPC client V2)
例 示例:发布二进制消息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }
Python (IPC client V2)
例 示例:发布二进制消息
import sys import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, BinaryMessage ) def main(): args = sys.argv[1:] topic = args[0] message = args[1] try: ipc_client = GreengrassCoreIPCClientV2() publish_binary_message_to_topic(ipc_client, topic, message) print('Successfully published to topic: ' + topic) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def publish_binary_message_to_topic(ipc_client, topic, message): binary_message = BinaryMessage(message=bytes(message, 'utf-8')) publish_message = PublishMessage(binary_message=binary_message) return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message) if __name__ == '__main__': main()
C++
例 示例:发布二进制消息
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); String message("Hello, World!"); int timeout = 10; PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
例 示例:发布二进制消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; private readonly messageString : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.messageString = "<define_your_message_string>"; this.publishToTopic().then(r => console.log("Started workflow")); } private async publishToTopic() { try { this.ipcClient = await getIpcClient(); const binaryMessage : BinaryMessage = { message: this.messageString } const publishMessage : PublishMessage = { binaryMessage: binaryMessage } const request : PublishToTopicRequest = { topic: this.topic, publishMessage: publishMessage } this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToTopic = new PublishToTopic();

SubscribeToTopic

订阅有关某个主题的消息。

此操作是一种订阅操作,您可以在其中订阅事件消息流。要使用此操作,请定义一个流响应处理程序,其中包含处理事件消息、错误和流关闭的函数。有关更多信息,请参阅 订阅 IPC 事件流

事件消息类型:SubscriptionResponseMessage

请求

此操作的请求包含以下参数:

topic

要订阅的主题。

注意

Greengrass Nucleus v2.6.0 及更高版本中,本主题支持 MQTT 主题通配符(#+)。

receiveMode (Python: receive_mode)

(可选)指定组件是否从自身接收消息的行为。您可以更改此行为以允许组件根据自己的消息进行操作。默认行为取决于主题是否包含 MQTT 通配符。从以下选项中进行选择:

  • RECEIVE_ALL_MESSAGES – 接收与该主题匹配的所有消息,包括来自订阅组件的消息。

    当您订阅不包含 MQTT 通配符的主题时,此模式为默认选项。

  • RECEIVE_MESSAGES_FROM_OTHERS – 接收与该主题匹配的所有消息,不包括来自订阅组件的消息。

    当您订阅包含 MQTT 通配符的主题时,此模式为默认选项。

此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了设置接收模式时必须使用的最低 Amazon IoT Device SDK 版本。

响应

此操作的响应包含以下信息:

messages

消息流。该对象 SubscriptionResponseMessage 包含以下信息。每条消息都包含 jsonMessagebinaryMessage

jsonMessage (Python: json_message)

(可选)一条 JSON 消息。该对象 JsonMessage 包含以下信息:

message

作为对象的 JSON 消息。

context

消息的上下文,例如消息发布的主题。

此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 Amazon IoT Device SDK 版本。

注意

Amazon IoT Greengrass Core 软件在 PublishToTopicSubscribeToTopic 操作中使用相同的消息对象。订阅后,Amazon IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。

该对象 MessageContext 包含以下信息:

topic

消息发布的主题。

binaryMessage (Python: binary_message)

(可选)二进制消息。该对象 BinaryMessage 包含以下信息:

message

以 blob 形式呈现的二进制消息。

context

消息的上下文,例如消息发布的主题。

此功能适用于 Greengrass Nucleus 组件的 v2.6.0 及更高版本。下表列出了访问消息上下文时必须使用的最低 Amazon IoT Device SDK 版本。

注意

Amazon IoT Greengrass Core 软件在 PublishToTopicSubscribeToTopic 操作中使用相同的消息对象。订阅后,Amazon IoT Greengrass Core 软件会在消息中设置此上下文对象,并在您发布的消息中忽略此上下文对象。

该对象 MessageContext 包含以下信息:

topic

消息发布的主题。

topicName (Python: topic_name)

消息被发布到的主题。

注意

目前尚未使用该属性。在 Greengrass Nucleus v2.6.0 及更高版本中,您可以从 SubscriptionResponseMessage 中获取 (jsonMessage|binaryMessage).context.topic 值以获取消息发布的主题。

示例

以下示例演示了如何在自定义组件代码中调用该操作。

Java (IPC client V2)
例 示例:订阅本地发布/订阅消息
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
例 示例:订阅本地发布/订阅消息
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
例 示例:订阅本地发布/订阅消息
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
例 示例:订阅本地发布/订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

示例

使用以下示例了解如何在组件中使用发布/订阅 IPC 服务。

以下示例配方允许该组件发布至所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar

以下示例 Java 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

以下示例配方允许该组件订阅所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

以下示例 Java 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

以下示例配方允许该组件发布至所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_publisher.py

以下示例 Python 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

以下示例配方允许该组件订阅所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_subscriber.py

以下示例 Python 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

以下示例配方允许该组件发布至所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER

以下示例 C++ 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件发布消息。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

以下示例配方允许该组件订阅所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER

以下示例 C++ 应用程序演示了如何使用发布/订阅 IPC 服务向其他组件订阅消息。

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }