Publish/subscribe local messages
Publish/subscribe (pubsub) messaging enables you to send and receive messages to topics. Components can publish messages to topics to send messages to other components. Then, components that are subscribed to that topic can act on the messages that they receive.
Note
You can't use this publish/subscribe IPC service to publish or subscribe to Amazon IoT Core MQTT. For more information about how to exchange messages with Amazon IoT Core MQTT, see Publish/subscribe Amazon IoT Core MQTT messages.
Minimum SDK versions
The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to publish and subscribe to messages to and from local topics.
SDK | Minimum version |
---|---|
v1.2.10 |
|
v1.5.3 |
|
v1.17.0 |
|
v1.12.0 |
Authorization
To use local publish/subscribe messaging in a custom component, you must define authorization policies that allow your component to send and receive messages to topics. For information about defining authorization policies, see Authorize components to perform IPC operations.
Authorization policies for publish/subscribe messaging have the following properties.
IPC service identifier:
aws.greengrass.ipc.pubsub
Operation | Description | Resources |
---|---|---|
|
Allows a component to publish messages to the topics that you specify. |
A topic string, such as This topic string doesn't support MQTT topic wildcards ( |
|
Allows a component to subscribe to messages for the topics that you specify. |
A topic string, such as In Greengrass nucleus v2.6.0 and
later, you can subscribe to topics that contain MQTT topic wildcards ( |
|
Allows a component to publish and subscribe to messages for the topics that you specify. |
A topic string, such as In Greengrass nucleus v2.6.0 and
later, you can subscribe to topics that contain MQTT topic wildcards ( |
Authorization policy examples
You can reference the following authorization policy example to help you configure authorization policies for your components.
Example authorization policy
The following example authorization policy allows a component to publish and subscribe to all topics.
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "
com.example.MyLocalPubSubComponent
:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }
PublishToTopic
Publish a message to a topic.
Request
This operation's request has the following parameters:
topic
-
The topic to which to publish the message.
publishMessage
(Python:publish_message
)-
The message to publish. This object,
PublishMessage
, contains the following information. You must specify one ofjsonMessage
andbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) A JSON message. This object,
JsonMessage
, contains the following information:message
-
The JSON message as an object.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the Greengrass nucleus component. The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to access the message context.
SDK Minimum version v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
The Amazon IoT Greengrass Core software uses the same message objects in the
PublishToTopic
andSubscribeToTopic
operations. The Amazon IoT Greengrass Core software sets this context object in messages when you subscribe, and ignores this context object in messages that you publish.This object,
MessageContext
, contains the following information:topic
-
The topic where the message was published.
binaryMessage
(Python:binary_message
)-
(Optional) A binary message. This object,
BinaryMessage
, contains the following information:message
-
The binary message as a blob.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the Greengrass nucleus component. The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to access the message context.
SDK Minimum version v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
The Amazon IoT Greengrass Core software uses the same message objects in the
PublishToTopic
andSubscribeToTopic
operations. The Amazon IoT Greengrass Core software sets this context object in messages when you subscribe, and ignores this context object in messages that you publish.This object,
MessageContext
, contains the following information:topic
-
The topic where the message was published.
Response
This operation doesn't provide any information in its response.
Examples
The following examples demonstrate how to call this operation in custom component code.
SubscribeToTopic
Subscribe to messages on a topic.
This operation is a subscription operation where you subscribe to a stream of event messages. To use this operation, define a stream response handler with functions that handle event messages, errors, and stream closure. For more information, see Subscribe to IPC event streams.
Event message type:
SubscriptionResponseMessage
Request
This operation's request has the following parameters:
topic
-
The topic to which to subscribe.
Note
In Greengrass nucleus v2.6.0 and later, this topic supports MQTT topic wildcards (
#
and+
). receiveMode
(Python:receive_mode
)-
(Optional) The behavior that specifies whether the component receives messages from itself. You can change this behavior to allow a component to act on its own messages. The default behavior depends on whether the topic contains an MQTT wildcard. Choose from the following options:
-
RECEIVE_ALL_MESSAGES
– Receive all messages that match the topic, including messages from the component that subscribes.This mode is the default option when you subscribe to a topic that doesn't contain an MQTT wildcard.
-
RECEIVE_MESSAGES_FROM_OTHERS
– Receive all messages that match the topic, except messages from the component that subscribes.This mode is the default option when you subscribe to a topic that contains an MQTT wildcard.
This feature is available for v2.6.0 and later of the Greengrass nucleus component. The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to set the receive mode.
SDK Minimum version v1.9.3
v1.11.3
v1.18.4
v1.12.0
-
Response
This operation's response has the following information:
messages
-
The stream of messages. This object,
SubscriptionResponseMessage
, contains the following information. Each message containsjsonMessage
orbinaryMessage
.jsonMessage
(Python:json_message
)-
(Optional) A JSON message. This object,
JsonMessage
, contains the following information:message
-
The JSON message as an object.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the Greengrass nucleus component. The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to access the message context.
SDK Minimum version v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
The Amazon IoT Greengrass Core software uses the same message objects in the
PublishToTopic
andSubscribeToTopic
operations. The Amazon IoT Greengrass Core software sets this context object in messages when you subscribe, and ignores this context object in messages that you publish.This object,
MessageContext
, contains the following information:topic
-
The topic where the message was published.
binaryMessage
(Python:binary_message
)-
(Optional) A binary message. This object,
BinaryMessage
, contains the following information:message
-
The binary message as a blob.
context
-
The context of the message, such as the topic where the message was published.
This feature is available for v2.6.0 and later of the Greengrass nucleus component. The following table lists the minimum versions of the Amazon IoT Device SDK that you must use to access the message context.
SDK Minimum version v1.9.3
v1.11.3
v1.18.4
v1.12.0
Note
The Amazon IoT Greengrass Core software uses the same message objects in the
PublishToTopic
andSubscribeToTopic
operations. The Amazon IoT Greengrass Core software sets this context object in messages when you subscribe, and ignores this context object in messages that you publish.This object,
MessageContext
, contains the following information:topic
-
The topic where the message was published.
topicName
(Python:topic_name
)-
The topic to which the message was published.
Note
This property isn't currently used. In Greengrass nucleus v2.6.0 and later, you can get the
(jsonMessage|binaryMessage).context.topic
value from aSubscriptionResponseMessage
to get the topic where the message was published.
Examples
The following examples demonstrate how to call this operation in custom component code.
Examples
Use the following examples to learn how to use the publish/subscribe IPC service in your components.
The following example recipe allows the component to publish to all topics.
The following example Java application demonstrates how to use the publish/subscribe IPC service to publish messages to other components.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }
The following example recipe allows the component to subscribe to all topics.
The following example Java application demonstrates how to use the publish/subscribe IPC service to subscribe to messages to other components.
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }
The following example recipe allows the component to publish to all topics.
The following example Python application demonstrates how to use the publish/subscribe IPC service to publish messages to other components.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
The following example recipe allows the component to subscribe to all topics.
The following example Python application demonstrates how to use the publish/subscribe IPC service to subscribe to messages to other components.
import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)
The following example recipe allows the component to publish to all topics.
The following example C++ application demonstrates how to use the publish/subscribe IPC service to publish messages to other components.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
The following example recipe allows the component to subscribe to all topics.
The following example C++ application demonstrates how to use the publish/subscribe IPC service to subscribe to messages to other components.
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }