Use the Amazon IoT Device SDK to communicate with the Greengrass nucleus, other components, and Amazon IoT Core - Amazon IoT Greengrass
Services or capabilities described in Amazon Web Services documentation might vary by Region. To see the differences applicable to the China Regions, see Getting Started with Amazon Web Services in China (PDF).

Use the Amazon IoT Device SDK to communicate with the Greengrass nucleus, other components, and Amazon IoT Core

Components running on your core device can use the Amazon IoT Greengrass Core interprocess communication (IPC) library in the Amazon IoT Device SDK to communicate with the Amazon IoT Greengrass nucleus and other Greengrass components. To develop and run custom components that use IPC, you must use the Amazon IoT Device SDK to connect to the Amazon IoT Greengrass Core IPC service and perform IPC operations.

The IPC interface supports two types of operations:

  • Request/response

    Components send a request to the IPC service and receive a response that contains the result of the request.

  • Subscription

    Components send a subscription request to the IPC service and expect a stream of event messages in response. Components provide a subscription handler that handles event messages, errors, and stream closure. The Amazon IoT Device SDK includes a handler interface with the correct response and event types for each IPC operation. For more information, see Subscribe to IPC event streams.

IPC client versions

In later versions of the Java and Python SDKs, Amazon IoT Greengrass provides an improved version of the IPC client, called IPC client V2. IPC client V2:

  • Reduces the amount of code that you need to write to use IPC operations and helps avoid common errors that can occur with IPC client V1.

  • Calls subscription handler callbacks in a separate thread, so you can now run blocking code, including additional IPC function calls, in subscription handler callbacks. IPC client V1 uses the same thread to communicate with the IPC server and call subscription handler callbacks.

  • Lets you call subscription operations using Lambda expressions (Java) or functions (Python). IPC client V1 requires you to define subscription handler classes.

  • Provides synchronous and asynchronous versions of each IPC operation. IPC client V1 provides only asynchronous versions of each operation.

We recommend that you use IPC client V2 to take advantage of these improvements. However, many examples in this documentation and in some online content demonstrate only how to use IPC client V1. You can use the following examples and tutorials to see sample components that use IPC client V2:

Currently, the Amazon IoT Device SDK for C++ v2 supports only IPC client V1.

Supported SDKs for interprocess communication

The Amazon IoT Greengrass Core IPC libraries are included in the following Amazon IoT Device SDK versions.

Connect to the Amazon IoT Greengrass Core IPC service

To use interprocess communication in your custom component, you must create a connection to an IPC server socket that the Amazon IoT Greengrass Core software runs. Complete the following tasks to download and use the Amazon IoT Device SDK in the language of your choice.

To use the Amazon IoT Device SDK for Java v2 (IPC client V2)
  1. Download the Amazon IoT Device SDK for Java v2 (v1.6.0 or later).

  2. Do one of the following to run your custom code in your component:

    • Build your component as a JAR file that includes the Amazon IoT Device SDK, and run this JAR file in your component recipe.

    • Define the Amazon IoT Device SDK JAR as a component artifact, and add that artifact to the classpath when you run your application in your component recipe.

  3. Use the following code to create the IPC client.

    try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { // Use client. } catch (Exception e) { LOGGER.log(Level.SEVERE, "Exception occurred when using IPC.", e); System.exit(1); }
To use the Amazon IoT Device SDK for Python v2 (IPC client V2)
  1. Download the Amazon IoT Device SDK for Python (v1.9.0 or later).

  2. Add the SDK's installation steps to the install lifecycle in your component's recipe.

  3. Create a connection to the Amazon IoT Greengrass Core IPC service. Use the following code to create the IPC client.

    from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 try: ipc_client = GreengrassCoreIPCClientV2() # Use IPC client. except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

To build the Amazon IoT Device SDK v2 for C++, a device must have the following tools:

  • C++ 11 or later

  • CMake 3.1 or later

  • One of the following compilers:

    • GCC 4.8 or later

    • Clang 3.9 or later

    • MSVC 2015 or later

To use the Amazon IoT Device SDK for C++ v2
  1. Download the Amazon IoT Device SDK for C++ v2 (v1.17.0 or later).

  2. Follow the installation instructions in the README to build the Amazon IoT Device SDK for C++ v2 from source.

  3. In your C++ build tool, link the Greengrass IPC library, AWS::GreengrassIpc-cpp, that you built in the previous step. The following CMakeLists.txt example links the Greengrass IPC library to a project that you build with CMake.

    cmake_minimum_required(VERSION 3.1) project (greengrassv2_pubsub_subscriber) file(GLOB MAIN_SRC "*.h" "*.cpp" ) add_executable(${PROJECT_NAME} ${MAIN_SRC}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX CXX_STANDARD 11) find_package(aws-crt-cpp PATHS ~/sdk-cpp-workspace/build) find_package(EventstreamRpc-cpp PATHS ~/sdk-cpp-workspace/build) find_package(GreengrassIpc-cpp PATHS ~/sdk-cpp-workspace/build) target_link_libraries(${PROJECT_NAME} AWS::GreengrassIpc-cpp)
  4. In your component code, create a connection to the Amazon IoT Greengrass Core IPC service to create an IPC client (Aws::Greengrass::GreengrassCoreIpcClient). You must define an IPC connection lifecycle handler that handles IPC connection, disconnection, and error events. The following example creates an IPC client and an IPC connection lifecycle handler that prints when the IPC client connects, disconnects, and encounters errors.

    #include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { // Create the IPC client. ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } // Use the IPC client to create an operation request. // Activate the operation request. auto activate = operation.Activate(request, nullptr); activate.wait(); // Wait for Greengrass Core to respond to the request. auto responseFuture = operation.GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } // Check the result of the request. auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } return 0; }
  5. To run your custom code in your component, build your code as a binary artifact, and run the binary artifact in your component recipe. Set the artifact's Execute permission to OWNER to enable the Amazon IoT Greengrass Core software to run the binary artifact.

    Your component recipe's Manifests section might look similar to the following example.

    JSON
    { ... "Manifests": [ { "Lifecycle": { "run": "{artifacts:path}/greengrassv2_pubsub_subscriber" }, "Artifacts": [ { "URI": "s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
    YAML
    ... Manifests: - Lifecycle: run: {artifacts:path}/greengrassv2_pubsub_subscriber Artifacts: - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pubsub_subscriber Permission: Execute: OWNER

To build the Amazon IoT Device SDK for JavaScript v2 for use with NodeJS, a device must have the following tools:

  • NodeJS 10.0 or later

    • Run node -v to check the Node version.

  • CMake 3.1 or later

To use the Amazon IoT Device SDK for JavaScript v2 (IPC client V1)
  1. Download the Amazon IoT Device SDK for JavaScript v2 (v1.12.10 or later).

  2. Follow the installation instructions in the README to build the Amazon IoT Device SDK for JavaScript v2 from source.

  3. Create a connection to the Amazon IoT Greengrass Core IPC service. Complete the following steps to create the IPC client and establish a connection.

  4. Use the following code to create the IPC client.

    import * as greengrascoreipc from 'aws-iot-device-sdk-v2'; let client = greengrascoreipc.createClient();
  5. Use the following code to establish a connection from your component to the Greengrass nucleus.

    await client.connect();

Authorize components to perform IPC operations

To allow your custom components to use some IPC operations, you must define authorization policies that allow the component to perform the operation on certain resources. Each authorization policy defines a list of operations and a list of resources that the policy allows. For example, the publish/subscribe messaging IPC service defines publish and subscribe operations for topic resources. You can use the * wildcard to allow access to all operations or all resources.

You define authorization policies with the accessControl configuration parameter, which you can set in the component recipe or when you deploy the component. The accessControl object maps IPC service identifiers to lists of authorization policies. You can define multiple authorization policies for each IPC service to control access. Each authorization policy has a policy ID, which must be unique among all components.

Tip

To create unique policy IDs, you can combine the component name, IPC service name, and a counter. For example, a component named com.example.HelloWorld might define two publish/subscribe authorization policies with the following IDs:

  • com.example.HelloWorld:pubsub:1

  • com.example.HelloWorld:pubsub:2

Authorization policies use the following format. This object is the accessControl configuration parameter.

JSON
{ "IPC service identifier": { "policyId": { "policyDescription": "description", "operations": [ "operation1", "operation2" ], "resources": [ "resource1", "resource2" ] } } }
YAML
IPC service identifier: policyId: policyDescription: description operations: - operation1 - operation2 resources: - resource1 - resource2

Wildcards in authorization policies

You can use the * wildcard in the resources element of IPC authorization policies to allow access to multiple resources in a single authorization policy.

  • In all versions of the Greengrass nucleus, you can specify a single * character as a resource to allow access to all resources.

  • In Greengrass nucleus v2.6.0 and later, you can specify the * character in a resource to match any combination of characters. For example, you can specify factory/1/devices/Thermostat*/status to allow access to a status topic for all thermostat devices in a factory, where each device's name begins with Thermostat.

When you define authorization policies for the Amazon IoT Core MQTT IPC service, you can also use MQTT wildcards (+ and #) to match multiple resources. For more information, see MQTT wildcards in Amazon IoT Core MQTT IPC authorization policies.

Recipe variables in authorization policies

If you use Greengrass nucleus v2.6.0 or later, and you set the Greengrass nucleus' interpolateComponentConfiguration configuration option to true, you can use the {iot:thingName} recipe variable in authorization policies. When you need an authorization policy that includes the core device's name, such as for MQTT topics or device shadows, you can use this recipe variable to configure a single authorization policy for a group of core devices. For example, you can allow a component access to the following resource for shadow IPC operations.

$aws/things/{iot:thingName}/shadow/

Special characters in authorization policies

To specify a literal * or ? character in an authorization policy, you must use an escape sequence. The following escape sequences instruct the Amazon IoT Greengrass Core software to use the literal value instead of the character's special meaning. For example, the * character is a wildcard that matches any combination of characters.

Literal character Escape sequence Notes

*

${*}

?

${?}

Amazon IoT Greengrass doesn't currently support the ? wildcard, which matches any single character.

$

${$}

Use this escape sequence to match a resource that contains ${. For example, to match a resource named ${resourceName}, you must specify ${$}{resourceName}. Otherwise, to match a resource that contains $, you can use a literal $, such as to allow access to a topic that begins with $aws.

Authorization policy examples

You can reference the following authorization policy examples to help you configure authorization policies for your components.

Example component recipe with an authorization policy

The following example component recipe includes an accessControl object defines an authorization policy. This policy authorizes the com.example.HelloWorld component to publish to the test/topic topic.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.HelloWorld", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "run": "java -jar {artifacts:path}/HelloWorld.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.HelloWorld ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: "com.example.HelloWorld:pubsub:1": policyDescription: Allows access to publish to test/topic. operations: - "aws.greengrass#PublishToTopic" resources: - "test/topic" Manifests: - Lifecycle: run: |- java -jar {artifacts:path}/HelloWorld.jar
Example component configuration update with an authorization policy

The following example configuration update in a deployment specifies to configure a component with an accessControl object that defines an authorization policy. This policy authorizes the com.example.HelloWorld component to publish to the test/topic topic.

Console
Configuration to merge
{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } }
Amazon CLI

The following command creates a deployment to a core device.

aws greengrassv2 create-deployment --cli-input-json file://hello-world-deployment.json

The hello-world-deployment.json file contains the following JSON document.

{ "targetArn": "arn:aws-cn:iot:us-west-2:123456789012:thing/MyGreengrassCore", "deploymentName": "Deployment for MyGreengrassCore", "components": { "com.example.HelloWorld": { "componentVersion": "1.0.0", "configurationUpdate": { "merge": "{\"accessControl\":{\"aws.greengrass.ipc.pubsub\":{\"com.example.HelloWorld:pubsub:1\":{\"policyDescription\":\"Allows access to publish to test/topic.\",\"operations\":[\"aws.greengrass#PublishToTopic\"],\"resources\":[\"test/topic\"]}}}}" } } } }
Greengrass CLI

The following Greengrass CLI command creates a local deployment on a core device.

sudo greengrass-cli deployment create \ --recipeDir recipes \ --artifactDir artifacts \ --merge "com.example.HelloWorld=1.0.0" \ --update-config hello-world-configuration.json

The hello-world-configuration.json file contains the following JSON document.

{ "com.example.HelloWorld": { "MERGE": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.HelloWorld:pubsub:1": { "policyDescription": "Allows access to publish to test/topic.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "test/topic" ] } } } } } }

Subscribe to IPC event streams

You can use IPC operations to subscribe to streams of events on a Greengrass core device. To use a subscribe operation, define a subscription handler and create a request to the IPC service. Then, the IPC client runs the subscription handler's functions each time that the core device streams an event message to your component.

You can close a subscription to stop processing event messages. To do so, call closeStream() (Java), close() (Python), or Close() (C++) on the subscription operation object that you used to open the subscription.

The Amazon IoT Greengrass Core IPC service supports the following subscribe operations:

Define subscription handlers

To define a subscription handler, define callback functions that handle event messages, errors, and stream closure. If you use IPC client V1, you must define these functions in a class. If you use IPC client V2, which is available in later versions of the Java and Python SDKs, you can define these functions without creating a subscription handler class.

Java

If you use IPC client V1, you must implement the generic software.amazon.awssdk.eventstreamrpc.StreamResponseHandler<StreamEventType> interface. StreamEventType is the type of event message for the subscription operation. Define the following functions to handle event messages, errors, and stream closure.

If you use IPC client V2, you can define these functions outside of a subscription handler class or use lambda expressions.

void onStreamEvent(StreamEventType event)

The callback that the IPC client calls when it receives an event message, such as an MQTT message or a component update notification.

boolean onStreamError(Throwable error)

The callback that the IPC client calls when a stream error occurs.

Return true to close the subscription stream as a result of the error, or return false to keep the stream open.

void onStreamClosed()

The callback that the IPC client calls when the stream closes.

Python

If you use IPC client V1, you must extend the stream response handler class that corresponds to the subscription operation. The Amazon IoT Device SDK includes a subscription handler class for each subscription operation. StreamEventType is the type of event message for the subscription operation. Define the following functions to handle event messages, errors, and stream closure.

If you use IPC client V2, you can define these functions outside of a subscription handler class or use lambda expressions.

def on_stream_event(self, event: StreamEventType) -> None

The callback that the IPC client calls when it receives an event message, such as an MQTT message or a component update notification.

def on_stream_error(self, error: Exception) -> bool

The callback that the IPC client calls when a stream error occurs.

Return true to close the subscription stream as a result of the error, or return false to keep the stream open.

def on_stream_closed(self) -> None

The callback that the IPC client calls when the stream closes.

C++

Implement a class that derives from the stream response handler class that corresponds to the subscription operation. The Amazon IoT Device SDK includes a subscription handler base class for each subscription operation. StreamEventType is the type of event message for the subscription operation. Define the following functions to handle event messages, errors, and stream closure.

void OnStreamEvent(StreamEventType *event)

The callback that the IPC client calls when it receives an event message, such as an MQTT message or a component update notification.

bool OnStreamError(OperationError *error)

The callback that the IPC client calls when a stream error occurs.

Return true to close the subscription stream as a result of the error, or return false to keep the stream open.

void OnStreamClosed()

The callback that the IPC client calls when the stream closes.

JavaScript

Implement a class that derives from the stream response handler class that corresponds to the subscription operation. The Amazon IoT Device SDK includes a subscription handler base class for each subscription operation. StreamEventType is the type of event message for the subscription operation. Define the following functions to handle event messages, errors, and stream closure.

on(event: 'ended', listener: StreamingOperationEndedListener)

The callback that the IPC client calls when the stream closes.

on(event: 'streamError', listener: StreamingRpcErrorListener)

The callback that the IPC client calls when a stream error occurs.

Return true to close the subscription stream as a result of the error, or return false to keep the stream open.

on(event: 'message', listener: (message: InboundMessageType) => void)

The callback that the IPC client calls when it receives an event message, such as an MQTT message or a component update notification.

Example subscription handlers

The following example demonstrates how to use the SubscribeToTopic operation and a subscription handler to subscribe to local publish/subscribe messages.

Java (IPC client V2)
Example: Subscribe to local publish/subscribe messages
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
Example: Subscribe to local publish/subscribe messages
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
Example: Subscribe to local publish/subscribe messages
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
Example: Subscribe to local publish/subscribe messages
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

IPC best practices

The best practices for using IPC in custom components differ between IPC client V1 and IPC client V2. Follow the best practices for the IPC client version that you use.

IPC client V2

The IPC client V2 runs callback functions in a separate thread, so compared to IPC client V1, there are fewer guidelines for you to follow when you use IPC and write subscription handler functions.

  • Reuse one IPC client

    After you create an IPC client, keep it open and reuse it for all IPC operations. Creating multiple clients uses extra resources and can result in resource leaks.

  • Handle exceptions

    The IPC client V2 logs uncaught exceptions in subscription handler functions. You should catch exceptions in your handler functions to handle errors that occur in your code.

IPC client V1

The IPC client V1 uses a single thread that communicates with the IPC server and calls subscription handlers. You must consider this synchronous behavior when you write subscription handler functions.

  • Reuse one IPC client

    After you create an IPC client, keep it open and reuse it for all IPC operations. Creating multiple clients uses extra resources and can result in resource leaks.

  • Run blocking code asynchronously

    The IPC client V1 can't send new requests or process new event messages while the thread is blocked. You should run blocking code in a separate thread that you run from the handler function. Blocking code includes sleep calls, loops that continuously run, and synchronous I/O requests that take time to complete.

  • Send new IPC requests asynchronously

    The IPC client V1 can't send a new request from within subscription handler functions, because the request blocks the handler function if you wait for a response. You should send IPC requests in a separate thread that you run from the handler function.

  • Handle exceptions

    The IPC client V1 doesn't handle uncaught exceptions in subscription handler functions. If your handler function throws an exception, the subscription closes, and the exception doesn't appear in your component logs. You should catch exceptions in your handler functions to keep the subscription open and log errors that occur in your code.