发布/订阅本地消息 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

发布/订阅本地消息

发布/订阅 (pubsub) 消息收发功能使您能够向 主题发送和接收消息。组件可以将消息发布到 主题以将消息发送到其他组件。然后,订阅该主题的组件可以对其收到的消息执行操作。

注意

您不能使用此发布/订阅 IPC 服务来发布或订阅 AWS IoT IoT Core MQTT。有关如何与 AWS IoT IoT Core MQTT 交换消息的更多信息,请参阅 发布/订阅 AWS IoT Core MQTT 消息

Authorization

要在自定义组件中使用发布/订阅消息收发 IPC 服务,您必须定义授权策略,以允许组件向主题发送和接收消息。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作

发布/订阅消息收发的授权策略具有以下属性。

IPC 服务标识符: aws.greengrass.ipc.pubsub

操作 描述 资源

aws.greengrass#PublishToTopic

允许组件向您指定的主题发布消息。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串不支持 MQTT 主题通配符(#+)。

aws.greengrass#SubscribeToTopic

允许组件订阅您指定的主题的消息。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串不支持 MQTT 主题通配符(#+)。

*

允许组件为您指定的主题发布和订阅消息。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串不支持 MQTT 主题通配符(#+)。

例 授权策略示例

以下示例授权策略允许组件发布和订阅所有主题。

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

Operations

对发布/订阅消息收发使用以下操作。

PublishToTopic

将消息发布到主题。

Request

此操作的请求具有以下参数:

topic

要将消息发布到的主题。

publishMessage

要发布的消息。此对象 PublishMessage包含以下信息。您必须指定 jsonMessage 和 之一binaryMessage

jsonMessage

(可选)JSON 消息。此对象 JsonMessage包含以下信息:

message

作为对象的 JSON 消息。

binaryMessage

(可选)二进制消息。此对象 BinaryMessage包含以下信息:

message

作为 blob 的二进制消息。

此操作不会在其响应中提供任何信息。

Examples

以下示例演示如何在自定义组件代码中调用此操作。

Java

例 示例:发布二进制消息

String topic = "my/topic"; String message = "Hello, World!"; PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishToTopicRequest.setPublishMessage(publishMessage); publishToTopicRequest.setTopic(topic); greengrassCoreIPCClient.publishToTopic(publishToTopicRequest, Optional.empty()).getResponse().get();
Python

例 示例:发布二进制消息

注意

此示例假定您使用的是适用于 Python v2 的 AWS IoT IoT 设备开发工具包版本 1.5.4 或更高版本。如果您使用的是开发工具包的版本 1.5.3,请参阅使用适用于 Python 的 AWS IoT 设备开发工具包 v2,以了解有关连接到 AWS AWS IoT Greengrass 核心 IPC 服务的信息。

import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World!" request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future = operation.get_response() future.result(TIMEOUT)

SubscribeToTopic

订阅主题中的消息。

此操作是订阅事件消息流的订阅操作。要使用此操作,请使用处理事件消息、错误和流结束的函数定义流响应处理程序。有关更多信息,请参阅订阅 IPC 事件流

事件消息类型: SubscriptionResponseMessage

Request

此操作的请求具有以下参数:

topic

要订阅的主题。

Response

此操作的响应包含以下信息:

topicName

消息发布到的主题。

messages

消息流。此对象 SubscriptionResponseMessage包含以下信息。每条消息都包含 jsonMessagebinaryMessage

jsonMessage

(可选)JSON 消息。此对象 JsonMessage包含以下信息:

message

作为对象的 JSON 消息。

binaryMessage

(可选)二进制消息。此对象 BinaryMessage包含以下信息:

message

作为 blob 的二进制消息。

Examples

以下示例演示如何在自定义组件代码中调用此操作。

Java

例 示例:订阅本地发布/订阅消息

String topic = "my/topic"; SubscribeToTopicRequest subscribeToTopicRequest = new SubscribeToTopicRequest(); subscribeToTopicRequest.setTopic(topic); StreamResponseHandler<SubscriptionResponseMessage> streamResponseHandler = new StreamResponseHandler<SubscriptionResponseMessage>() { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); // Handle message. } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { // Handle error. return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { // Handle close. } }; SubscribeToTopicResponseHandler operationResponseHandler = greengrassCoreIPCClient .subscribeToTopic(subscribeToTopicRequest, Optional.of(streamResponseHandler)); operationResponseHandler.getResponse().get(); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. operationResponseHandler.closeStream();
Python

例 示例:订阅本地发布/订阅消息

注意

此示例假定您使用的是适用于 Python v2 的 AWS IoT IoT 设备开发工具包版本 1.5.4 或更高版本。如果您使用的是开发工具包的版本 1.5.3,请参阅使用适用于 Python 的 AWS IoT 设备开发工具包 v2以了解有关连接到 AWS AWS IoT Greengrass 核心 IPC 服务的信息。

import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message_string = str(event.binary_message.message, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()

Examples

使用以下示例了解如何在您的组件中使用发布/订阅 IPC 服务。

以下示例配方允许 组件发布到所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar

以下示例 Java 应用程序演示如何使用 发布/订阅 IPC 服务将消息发布到其他组件。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

以下示例配方允许 组件订阅所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

以下示例 Java 应用程序演示如何使用 发布/订阅 IPC 服务订阅其他组件的消息。

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

以下示例配方允许 组件发布到所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: python3 -u {artifacts:path}/pubsub_publisher.py

以下示例 Python 应用程序演示如何使用 发布/订阅 IPC 服务将消息发布到其他组件。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) futureResponse = operation.get_response() try: futureResponse.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

以下示例配方允许 组件订阅所有主题。

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: python3 -u {artifacts:path}/pubsub_subscriber.py

以下示例 Python 应用程序演示如何使用 发布/订阅 IPC 服务订阅其他组件的消息。

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) future = operation.activate(request) try: future.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)