发布/订阅 AWS IoT Core MQTT 消息 - AWS IoT Greengrass
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

发布/订阅 AWS IoT Core MQTT 消息

AWS IoT IoT Core MQTT 消息收发 IPC 服务,您可以向 AWS IoT IoT Core 发送和接收 MQTT 消息。组件可以将消息发布到 AWS IoT IoT Core 并订阅 主题以处理来自其他来源的 MQTT 消息。有关 MQTT 的 AWS AWS IoT Core 实施的更多信息,请参阅AWS IoT IoT Core 开发人员指南》中的 MQTT。

注意

此 MQTT 消息收发 IPC 服务可让您与 AWS IoT IoT Core 交换消息。有关如何在组件之间交换消息的更多信息,请参阅发布/订阅本地消息

Authorization

要在自定义组件中使用 AWS IoT Core MQTT 消息收发,您必须定义授权策略,以允许组件发送和接收有关 主题的消息。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作

AWS AWS IoT Core MQTT 消息收发的授权策略具有以下属性。

IPC 服务标识符: aws.greengrass.ipc.mqttproxy

操作 描述 资源

aws.greengrass#PublishToIoTCore

允许组件在您指定的 MQTT 主题上将消息发布到 AWS IoT IoT Core。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串支持 MQTT 主题通配符(#+)。

aws.greengrass#SubscribeToIoTCore

允许组件订阅来自 AWS AWS IoT Core 的有关您指定的主题的消息。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串支持 MQTT 主题通配符(#+)。

*

允许组件为您指定的主题发布和订阅 AWS AWS IoT Core MQTT 消息。

主题字符串,例如 或 test/topic*以允许访问所有主题。本主题字符串支持 MQTT 主题通配符(#+)。

例 授权策略示例

以下示例授权策略允许组件发布和订阅所有主题。

{ "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.MyIoTCorePubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToIoTCore", "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "*" ] } } } }

Operations

对 AWS AWS IoT Core MQTT 消息收发使用以下操作。

PublishToIoTCore

向 AWS AWS IoT Core 发布主题的 MQTT 消息。

Request

此操作的请求具有以下参数:

topicName

要将消息发布到的主题。

qos

要使用的 MQTT QoS。此枚举 QOS具有以下值:

  • AT_MOST_ONCE – QoS 0。MQTT 消息最多传送一次。

  • AT_LEAST_ONCE – QoS 1。MQTT 消息至少传送一次。

payload

(可选)作为 blob 的消息负载。

Response

此操作不会在其响应中提供任何信息。

Examples

以下示例演示如何在自定义组件代码中调用此操作。

Java

例 示例:发布消息

String topic = "my/topic"; String message = "Hello, World!"; QOS qos = QOS.AT_LEAST_ONCE; PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest(); publishToIoTCoreRequest.setTopicName(topic); publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8)); publishToIoTCoreRequest.setQos(qos); greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty()).getResponse().get();
Python

例 示例:发布消息

注意

此示例假定您使用的是适用于 Python v2 的 AWS IoT IoT 设备开发工具包版本 1.5.4 或更高版本。如果您使用的是开发工具包的版本 1.5.3,请参阅适用于 Python 的 AWS IoT 设备软件开发工具包 v2,以了解有关连接到 AWS AWS IoT Greengrass 核心 IPC 服务的信息。

import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( QOS, PublishToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() topic = "my/topic" message = "Hello, World" qos = QOS.AT_LEAST_ONCE request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = bytes(message, "utf-8") request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future = operation.get_response() future.result(TIMEOUT)

SubscribeToIoTCore

订阅来自 AWS AWS IoT Core 的关于主题或主题筛选条件的 MQTT 消息。当组件生命周期结束时 AWS IoT IoT Greengrass 核心软件将删除订阅。

此操作是订阅事件消息流的订阅操作。要使用此操作,请使用处理事件消息、错误和流结束的函数定义流响应处理程序。有关更多信息,请参阅订阅 IPC 事件流

事件消息类型: IoTCoreMessage

Request

此操作的请求具有以下参数:

topicName

要订阅的主题。您可以使用 MQTT 主题通配符(#+)订阅多个主题。

qos

要使用的 MQTT QoS。此枚举 QOS具有以下值:

  • AT_MOST_ONCE – QoS 0。MQTT 消息最多传送一次。

  • AT_LEAST_ONCE – QoS 1。MQTT 消息至少传送一次。

Response

此操作的响应包含以下信息:

messages

MQTT 消息的流。此对象 IoTCoreMessage包含以下信息:

message

MQTT 消息。此对象 MQTTMessage包含以下信息:

topicName

消息发布到的主题。

payload

(可选)作为 blob 的消息负载。

Examples

以下示例演示如何在自定义组件代码中调用此操作。

Java

例 示例:订阅消息

String topic = "my/topic"; QOS qos = QOS.AT_MOST_ONCE; SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest(); subscribeToIoTCoreRequest.setTopicName(topic); subscribeToIoTCoreRequest.setQos(qos); StreamResponseHandler<IoTCoreMessage> streamResponseHandler = new StreamResponseHandler<IoTCoreMessage>() { @Override public void onStreamEvent(IoTCoreMessage ioTCoreMessage) { try { String message = new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8); // Handle message. } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable throwable) { // Handle error. return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { // Handle close. } }; SubscribeToIoTCoreResponseHandler operationResponseHandler = greengrassCoreIPCClient .subscribeToIoTCore(subscribeToIoTCoreRequest, Optional.of(streamResponseHandler)); operationResponseHandler.getResponse().get(); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. operationResponseHandler.closeStream();
Python

例 示例:订阅消息

注意

此示例假定您使用的是适用于 Python v2 的 AWS IoT IoT 设备开发工具包版本 1.5.4 或更高版本。如果您使用的是开发工具包的版本 1.5.3,请参阅适用于 Python 的 AWS IoT 设备软件开发工具包 v2,以了解有关连接到 AWS AWS IoT Greengrass 核心 IPC 服务的信息。

import time import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( IoTCoreMessage, QOS, SubscribeToIoTCoreRequest ) TIMEOUT = 10 ipc_client = awsiot.greengrasscoreipc.connect() class StreamHandler(client.SubscribeToIoTCoreStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: IoTCoreMessage) -> None: try: message = str(event.message.payload, "utf-8") # Handle message. except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: # Handle error. return True # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: # Handle close. pass topic = "my/topic" qos = QOS.AT_MOST_ONCE request = SubscribeToIoTCoreRequest() request.topic_name = topic request.qos = qos handler = StreamHandler() operation = ipc_client.new_subscribe_to_iot_core(handler) future = operation.activate(request) future.result(TIMEOUT) # Keep the main thread alive, or the process will exit. while True: time.sleep(10) # To stop subscribing, close the operation stream. operation.close()