Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门。本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
发布/订阅Amazon IoT Core MQTT 消息
MAmazon IoT Core QTT 消息 IPC 服务允许您发送和接收 MQTT 消息Amazon IoT Core。组件可以向主题发布消息Amazon IoT Core和订阅主题,以对来自其他来源的 MQTT 消息采取行动。有关 MQTTAmazon IoT Core 实现的更多信息,请参阅Amazon IoT Core开发者指南中的 MQTT。
此 MQTT 消息 IPC 服务允许您与交换消息Amazon IoT Core。有关如何在组件之间交换消息的更多信息,请参阅发布/订阅本地消息。
最低 SDK 版本
下表列出了发布和订阅来往Amazon IoT Device SDK的 MQTT 消息所必须使用的最低版本Amazon IoT Core。
Authorization
要在自定义组件中使用Amazon IoT Core MQTT 消息,您必须定义授权策略,允许您的组件发送和接收有关主题的消息。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作。
Amazon IoT CoreMQTT 消息的授权策略具有以下属性。
IPC 服务标识符:aws.greengrass.ipc.mqttproxy
运算 |
描述 |
资源 |
aws.greengrass#PublishToIoTCore
|
允许组件向您指定Amazon IoT Core的 MQTT 主题发布消息。
|
主题字符串,例如或test/topic ,* 用于允许访问所有主题。您可以使用 MQTT 主题通配符(# 和+ )来匹配多个资源。
|
aws.greengrass#SubscribeToIoTCore
|
允许组件订阅来自您指定Amazon IoT Core主题的消息。
|
主题字符串,例如或test/topic ,* 用于允许访问所有主题。您可以使用 MQTT 主题通配符(# 和+ )来匹配多个资源。
|
*
|
允许组件发布和订阅您指定的主题的Amazon IoT Core MQTT 消息。
|
主题字符串,例如或test/topic ,* 用于允许访问所有主题。您可以使用 MQTT 主题通配符(# 和+ )来匹配多个资源。
|
MQTT 授权策略中的Amazon IoT Core MQTT 通配符
您可以在 MQTT IPC 授权策略中使用Amazon IoT Core MQTT 通配符。组件可以发布和订阅与您在授权策略中允许的主题过滤器相匹配的主题。例如,如果组件的授权策略授予访问权限test/topic/#
,则该组件可以订阅test/topic/#
,也可以发布和订阅test/topic/filter
。
Amazon IoT CoreMQTT 授权策略中的配方变量
如果您使用 Greengrass 核心的 v2.6.0 或更高版本,则可以在授权策略中使用{iot:thingName}
配方变量。此功能允许您为一组核心设备配置单一授权策略,其中每台核心设备只能访问包含自己名称的主题。例如,您可以允许组件访问以下主题资源。
devices/{iot:thingName}/messages
有关更多信息,请参阅 配方变量 和 在合并更新中使用配方变量。
授权策略示例
您可以参考以下授权策略示例来帮助您为组件配置授权策略。
例 具有无限制访问权限的授权策略示例
以下示例授权策略允许组件发布和订阅所有主题。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
例 具有有限访问权限的授权策略示例
以下示例授权策略允许组件发布和订阅两个名为factory/1/events
和的主题factory/1/actions
。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
例 一组核心设备的授权策略示例
以下示例授权策略允许组件发布和订阅包含运行该组件的核心设备名称的主题。
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent
:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
PublishToIotCore
向Amazon IoT Core某个主题发布 MQTT 消息。
请求
该操作的请求具有以下参数:
topicName
(Python:topic_name
)
-
发布消息的主题。
qos
-
要使用的 MQTT QoS。此枚举具有以下值:QOS
payload
-
(可选)消息负载为 blob。
响应
此操作在其响应中未提供任何信息。
示例
以下示例演示了如何使用自定义组件代码调用此操作。
- Java (IPC client V1)
-
例 示例:发布消息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PublishToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
PublishToIoTCoreResponseHandler responseHandler =
PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos);
CompletableFuture<PublishToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) {
PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest();
publishToIoTCoreRequest.setTopicName(topic);
publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8));
publishToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty());
}
}
- Python (IPC client V1)
-
例 示例:发布消息
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "my/topic"
message = "Hello, World"
qos = QOS.AT_LEAST_ONCE
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = bytes(message, "utf-8")
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
- C++
-
例 示例:发布消息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String message("Hello, World!");
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
SubscribeToIotCore
通过主题或主题过滤器订阅 MQTT 消息。Amazon IoT Core当组件生命周期结束时,Amazon IoT GreengrassCore 软件会删除订阅。
此操作是一种订阅操作,您可以在其中订阅事件消息流。要使用此操作,请定义一个流响应处理程序,其中包含处理事件消息、错误和流关闭的函数。有关更多信息,请参阅订阅 IPC 事件直播:
事件消息类型:IoTCoreMessage
请求
该操作的请求具有以下参数:
topicName
(Python:topic_name
)
-
要订阅的主题。您可以使用 MQTT 主题通配符(#
和+
)订阅多个主题。
qos
-
要使用的 MQTT QoS。此枚举具有以下值:QOS
响应
此操作的响应包含以下信息:
messages
-
MQTT 消息流。该对象包含以下信息:IoTCoreMessage
message
-
MQTT 消息。该对象包含以下信息:MQTTMessage
topicName
(Python:topic_name
)
-
发布消息的主题。
payload
-
(可选)消息负载为 blob。
示例
以下示例演示了如何使用自定义组件代码调用此操作。
- Java (IPC client V1)
-
例 示例:订阅消息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
StreamResponseHandler<IoTCoreMessage> streamResponseHandler =
new SubscriptionResponseHandler();
SubscribeToIoTCoreResponseHandler responseHandler =
SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos,
streamResponseHandler);
CompletableFuture<SubscribeToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
throw e;
}
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) {
SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest();
subscribeToIoTCoreRequest.setTopicName(topic);
subscribeToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest,
Optional.of(streamResponseHandler));
}
public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> {
@Override
public void onStreamEvent(IoTCoreMessage ioTCoreMessage) {
try {
String topic = ioTCoreMessage.getMessage().getTopicName();
String message = new String(ioTCoreMessage.getMessage().getPayload(),
StandardCharsets.UTF_8);
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false;
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to IoT Core stream closed.");
}
}
}
- Python (IPC client V1)
-
例 示例:订阅消息
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
topic_name = event.message.topic_name
# Handle message.
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "my/topic"
qos = QOS.AT_MOST_ONCE
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
# To stop subscribing, close the operation stream.
operation.close()
- C++
-
例 示例:订阅消息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string topicName = message.value().GetTopicName().value().c_str();
// Handle message.
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
示例
使用以下示例学习如何在组件中使用Amazon IoT Core MQTT IPC 服务。
以下示例配方允许组件发布到所有主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCorePublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes MQTT messages to IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCorePublisherCpp:mqttproxy:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_iotcore_publisher"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCorePublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes MQTT messages to IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCorePublisherCpp:mqttproxy:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_iotcore_publisher"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher
Permission:
Execute: OWNER
以下示例 C++ 应用程序演示了如何使用Amazon IoT Core MQTT IPC 服务向发布消息Amazon IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
以下示例配方允许组件订阅所有主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCoreSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCoreSubscriberCpp:mqttproxy:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_iotcore_subscriber"
},
"Artifacts": [
{
"URI": "s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCoreSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to MQTT messages from IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCoreSubscriberCpp:mqttproxy:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_iotcore_subscriber"
Artifacts:
- URI: s3://DOC-EXAMPLE-BUCKET
/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber
Permission:
Execute: OWNER
以下示例 C++ 应用程序演示如何使用Amazon IoT Core MQTT IPC 服务订阅来自的消息Amazon IoT Core。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}