本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
发布/订阅 Amazon IoT Core MQTT 消息
Amazon IoT Core MQTT 消息传递 IPC 服务允许您发送和接收 MQTT 消息。 Amazon IoT Core组件可以向其他来源发布消息 Amazon IoT Core 和订阅主题,以处理来自其他来源的 MQTT 消息。有关 MQTT Amazon IoT Core 实现的更多信息,请参阅《Amazon IoT Core 开发人员指南》中的 MQTT。
注意
此 MQTT 消息 IPC 服务允许您与交换消息。 Amazon IoT Core有关如何在组件之间交换消息的更多信息,请参阅发布/订阅本地消息。
最低 SDK 版本
下表列出了在发布和订阅 MQTT 消息时必须使用的最低版本。 Amazon IoT Device SDK Amazon IoT Core
| SDK | 最低版本 |
|---|---|
|
v1.2.10 |
|
|
v1.5.3 |
|
|
v1.17.0 |
|
|
v1.12.0 |
Authorization
要在自定义组件中使用 Amazon IoT Core MQTT 消息传递,您必须定义授权策略,允许您的组件发送和接收有关主题的消息。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作。
Amazon IoT Core MQTT 消息传递的授权策略具有以下属性。
IPC 服务标识符:aws.greengrass.ipc.mqttproxy
| 操作 | 说明 | 资源 |
|---|---|---|
|
|
允许组件在您指定的 MQTT 主题 Amazon IoT Core 上向其发布消息。 |
允许访问所有主题的主题字符串(例如 |
|
|
允许组件订阅来自您指定 Amazon IoT Core 主题的消息。 |
允许访问所有主题的主题字符串(例如 |
|
|
允许组件发布和订阅您指定的主题的 Amazon IoT Core MQTT 消息。 |
允许访问所有主题的主题字符串(例如 |
MQTT 授权策略中的 M Amazon IoT Core QTT 通配符
您可以在 MQTT IPC 授权策略中使用 Amazon IoT Core MQTT 通配符。组件可以发布和订阅与您在授权策略中允许的主题筛选条件相匹配的主题。例如,如果组件的授权策略授予 test/topic/# 访问权限,则该组件可以订阅 test/topic/#,也可以发布和订阅 test/topic/filter。
Amazon IoT Core MQTT 授权策略中的配方变量
如果您使用的是 Greengrass Nucleus v2.6.0 或更高版本,则可以在授权策略中使用 {iot:thingName} 配方变量。此功能使您可以为一组核心设备配置单个授权策略,其中每台核心设备只能访问包含自己名称的主题。例如,您可以允许组件访问以下主题资源。
devices/{iot:thingName}/messages
有关更多信息,请参阅配方变量和在合并更新中使用配方变量。
授权策略示例
您可以参考以下授权策略示例,帮助您为组件配置授权策略。
例具有无限制访问权限的示例授权策略
以下示例授权策略允许组件发布和订阅所有主题。
例具有有限访问权限的示例授权策略
以下示例授权策略允许组件发布和订阅名为 factory/1/events 和 factory/1/actions 的两个主题。
例核心设备组的示例授权策略
重要
此示例使用了 Greengrass nucleus 组件的 v2.6.0 及更高版本中提供的功能。Greengrass Nucleus v2.6.0 在组件配置中添加了对大多数配方变量(例如 {iot:thingName})的支持。
以下示例授权策略允许组件发布和订阅包含运行该组件的核心设备名称的主题。
PublishToIoTCore
向 Amazon IoT Core 发布有关某个主题的 MQTT 消息。
当您向发布 MQTT 消息时 Amazon IoT Core,有每秒 100 个交易的配额。如果超出此配额,消息将在 Greengrass 设备上排队等候处理。还有每秒 512 Kb 的数据配额,整个账户的配额为每秒 20,000 次发布(有些 Amazon Web Services 区域有 2,000 次)。有关 Amazon IoT Core中 MQTT 消息代理限制的更多信息,请参阅 Amazon IoT Core 消息代理和协议限制以及配额。
如果您超过这些配额,Greengrass 设备会将发布消息限制为。 Amazon IoT Core消息存储在内存中的后台处理程序中。默认情况下,分配给后台处理程序的内存为 2.5 Mb。如果后台处理程序已满,则新消息将被拒绝。您可以增大后台处理程序的大小。有关更多信息,请参阅 Greengrass Nucleus文档中的 配置。为避免填满后台处理程序并且需要增加分配的内存,请将发布请求限制为每秒不超过 100 个请求。
当应用程序需要以更高的速率发送消息或发送更大的消息时,可以考虑使用 流管理器 向 Kinesis Data Streams 发送消息。流管理器组件旨在将大量数据传输到 Amazon Web Services 云。有关更多信息,请参阅 管理 Greengrass 核心设备上的数据流。
请求
此操作的请求包含以下参数:
topicName(Python:topic_name)-
要向其发布消息的主题。
qos-
要使用的 MQTT QoS。此枚举
QOS包含以下值:-
AT_MOST_ONCE– QoS 0。MQTT 消息至多传送一次。 -
AT_LEAST_ONCE– QoS 1。MQTT 消息至少传送一次。
-
payload-
(可选)以 Blob 形式显示的消息有效载荷。
使用 MQTT 5 时,以下功能适用于 Greengrass Nucleus v2.10.0 及更高版本。如果使用的是 MQTT 3.1.1,则会忽略这些功能。下表列出了访问这些功能必须使用的 Amazon IoT 设备 SDK 的最低版本。
| SDK | 最低版本 |
|---|---|
| Amazon IoT Device SDK for Python |
v1.15.0 |
| Amazon IoT Device SDK for Java |
v1.13.0 |
| Amazon IoT Device SDK for C++ |
v1.24.0 |
| Amazon IoT Device SDK for JavaScript |
v1.13.0 |
payloadFormat-
(可选)消息有效载荷的格式。如果您未设置
payloadFormat,则假定类型为BYTES。枚举包含以下值:-
BYTES– 有效载荷的内容是二进制 Blob。 -
UTF8— 有效载荷的内容是一 UTF8 串字符。
-
retain-
(可选)指示是否在发布时将 MQTT 保留选项设置为
true。 userProperties-
(可选)要发送的应用程序特定
UserProperty对象的列表。UserProperty对象定义如下:UserProperty: key: string value: string messageExpiryIntervalSeconds-
(可选)消息过期并被服务器删除前等待的秒数。如果您未设置此值,消息不会过期。
correlationData-
(可选)已添加到请求中的信息,可用于将请求与响应相关联。
responseTopic-
(可选)应用于响应消息的主题。
contentType-
(可选)消息内容类型的应用程序特定标识符。
响应
此操作在其响应中未提供任何信息。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
SubscribeToIoTCore
通过主题或主题筛选器订阅 MQTT 消息。 Amazon IoT Core 当组件的生命周期结束时,C Amazon IoT Greengrass ore 软件会删除订阅。
此操作是一种订阅操作,您可以在其中订阅事件消息流。要使用此操作,请定义一个流响应处理程序,其中包含处理事件消息、错误和流关闭的函数。有关更多信息,请参阅 订阅 IPC 事件流。
事件消息类型:IoTCoreMessage
请求
此操作的请求包含以下参数:
topicName(Python:topic_name)-
要订阅的主题。您可以使用 MQTT 主题通配符(
#和+)来订阅多个主题。 qos-
要使用的 MQTT QoS。此枚举
QOS包含以下值:-
AT_MOST_ONCE– QoS 0。MQTT 消息至多传送一次。 -
AT_LEAST_ONCE– QoS 1。MQTT 消息至少传送一次。
-
响应
此操作的响应包含以下信息:
messages-
MQTT 消息流。此对象
IoTCoreMessage包含以下信息:message-
MQTT 消息。此对象
MQTTMessage包含以下信息:topicName(Python:topic_name)-
消息被发布到的主题。
payload-
(可选)以 Blob 形式显示的消息有效载荷。
使用 MQTT 5 时,以下功能适用于 Greengrass Nucleus v2.10.0 及更高版本。如果使用的是 MQTT 3.1.1,则会忽略这些功能。下表列出了访问这些功能必须使用的 Amazon IoT 设备 SDK 的最低版本。
SDK 最低版本 Amazon IoT Device SDK for Python v2 v1.15.0 Amazon IoT Device SDK for Java v2 v1.13.0 Amazon IoT Device SDK for C++ v2 v1.24.0 Amazon IoT Device SDK for JavaScript v2 v1.13.0 payloadFormat-
(可选)消息有效载荷的格式。如果您未设置
payloadFormat,则假定类型为BYTES。枚举包含以下值:-
BYTES– 有效载荷的内容是二进制 Blob。 -
UTF8— 有效载荷的内容是一 UTF8串字符。
-
retain-
(可选)指示是否在发布时将 MQTT 保留选项设置为
true。 userProperties-
(可选)要发送的应用程序特定
UserProperty对象的列表。UserProperty对象定义如下:UserProperty: key: string value: string messageExpiryIntervalSeconds-
(可选)消息过期并被服务器删除前等待的秒数。如果您未设置此值,消息不会过期。
correlationData-
(可选)已添加到请求中的信息,可用于将请求与响应相关联。
responseTopic-
(可选)应用于响应消息的主题。
contentType-
(可选)消息内容类型的应用程序特定标识符。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
示例
使用以下示例来学习如何在组件中使用 Amazon IoT Core MQTT IPC 服务。
以下示例配方允许该组件发布至所有主题。
以下示例 C++ 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务向发布消息。 Amazon IoT Core
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
以下示例配方允许该组件订阅所有主题。
以下示例 C++ 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务订阅来自的消息。 Amazon IoT Core
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
以下示例配方允许该组件发布至所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherRust:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherRust/1.0.0/publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 Rust 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务向发布消息。 Amazon IoT Core
use gg_sdk::{Qos, Sdk}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; let qos = Qos::AtLeastOnce; sdk.publish_to_iot_core(topic, message, qos) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }
以下示例配方允许该组件订阅所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberRust:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberRust/1.0.0/subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 Rust 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务订阅来自的消息。 Amazon IoT Core
use gg_sdk::{Qos, Sdk}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let qos = Qos::AtLeastOnce; let callback = |topic: &str, payload: &[u8]| { let message = String::from_utf8_lossy(payload); println!("Received new message on topic {topic}: {message}"); }; let _sub = sdk .subscribe_to_iot_core(topic, qos, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }
以下示例配方允许该组件发布至所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherC:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherC/1.0.0/sample_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 C 应用程序演示如何使用 Amazon IoT Core MQTT IPC 服务向发布消息。 Amazon IoT Core
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; err = ggipc_publish_to_iot_core(topic, message, qos); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }
以下示例配方允许该组件订阅所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberC:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberC/1.0.0/sample_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 C 应用程序演示如何使用 Amazon IoT Core MQTT IPC 服务订阅来自的消息。 Amazon IoT Core
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgBuffer payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) payload.len, payload.data ); } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_iot_core( topic, qos, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the subscription handle. ggipc_close_subscription(handle); }
以下示例配方允许该组件发布至所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/sample_cpp_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 C++ 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务向发布消息。 Amazon IoT Core
#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; uint8_t qos = 1; error = client.publish_to_iot_core(topic, message, qos); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }
以下示例配方允许该组件订阅所有主题。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/sample_cpp_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
以下示例 C++ 应用程序演示了如何使用 Amazon IoT Core MQTT IPC 服务订阅来自的消息。 Amazon IoT Core
#include <gg/ipc/client.hpp> #include <unistd.h> #include <iostream> class ResponseHandler : public gg::ipc::IotTopicCallback { void operator()( std::string_view topic, gg::Buffer payload, gg::ipc::Subscription &handle ) override { (void) handle; std::cout << "Received new message on topic " << topic << ": " << payload << "\n"; } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; uint8_t qos = 1; static ResponseHandler handler; error = client.subscribe_to_iot_core(topic, qos, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }