AWS IoT Device Shadow 演示应用程序 - FreeRTOS
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS IoT Device Shadow 演示应用程序

Introduction

此演示演示如何使用 AWS IoT Device Shadow 库连接到 AWS Device Shadow 服务。它使用 coreMQTT 库 建立与 AWS IoT MQTT 代理和 coreJSON 库解析程序的 TLS(双向身份验证)的 MQTT 连接,以解析从 AWS 影子服务收到的影子文档。该演示显示基本影子操作,例如,如何更新影子文档以及如何删除影子文档。该演示还演示了如何向 coreMQTT 库注册回调函数以处理从 /update Device Shadow 服务发送的影子 /update/delta 和 AWS IoT 消息等消息。

此演示仅用作学习练习,因为更新影子文档 (状态) 的请求和更新响应是由同一应用程序完成的。在逼真的生产场景中,外部应用程序远程请求更新设备状态,即使设备当前未连接。设备将在连接时确认更新请求。

注意

要设置和运行 FreeRTOS 演示,请按照开始使用 FreeRTOS中的步骤操作。

Functionality

该演示创建一个应用程序任务,该任务循环访问一组示例,以演示影子 /update/update/delta 回调,从而模拟切换远程设备状态。它发送具有新 desired 状态的影子更新,并等待设备更改其 reported 状态以响应新 desired 状态。此外,影子 /update 回调用于输出更改的影子状态。此演示还使用与 AWS IoT MQTT 代理的安全 MQTT 连接,并假定设备影子中存在 powerOn 状态。

该演示执行以下操作:

  1. 使用 shadow_demo_helpers.c 中的帮助程序函数建立 MQTT 连接。

  2. 使用 AWS IoT Device Shadow 库定义的宏汇编设备影子操作的 MQTT 主题字符串。

  3. 发布到用于删除设备影子以删除任何现有设备影子的 MQTT 主题。

  4. /update/delta 中使用帮助程序函数订阅 /update/accepted/update/rejectedshadow_demo_helpers.c 的 MQTT 主题。

  5. powerOn 中使用帮助程序函数发布所需的 shadow_demo_helpers.c 状态。 这会导致将 /update/delta 消息发送到设备。

  6. prvEventCallback 中处理传入的 MQTT 消息,并使用由 AWS IoT Device Shadow 库 (Shadow_MatchTopic) 定义的函数确定消息是否与设备影子相关。如果消息是设备影子 /update/delta 消息,则主演示函数将发布第二条消息以将报告状态更新为 powerOn。 如果收到 /update/accepted 消息,请验证其 clientToken 是否与之前在更新消息中发布的 相同。这将标记演示的结束。


                影子演示终端输出

下面是演示的结构。

void prvShadowDemoTask( void * pvParameters ) { BaseType_t demoStatus = pdPASS; /* A buffer containing the update document. It has static duration to * prevent it from being placed on the call stack. */ static char pcUpdateDocument[ SHADOW_REPORTED_JSON_LENGTH + 1 ] = { 0 }; demoStatus = xEstablishMqttSession( prvEventCallback ); if( pdFAIL == demoStatus ) { /* Log error to indicate connection failure. */ LogError( ( "Failed to connect to MQTT broker." ) ); } else { /* First of all, try to delete any Shadow document in the cloud. */ demoStatus = xPublishToTopic( SHADOW_TOPIC_STRING_DELETE( THING_NAME ), SHADOW_TOPIC_LENGTH_DELETE( THING_NAME_LENGTH ), pcUpdateDocument, 0U ); /* Then try to subscribe to the shadow topics. */ if( demoStatus == pdPASS ) { demoStatus = xSubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_DELTA( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) ); } if( demoStatus == pdPASS ) { demoStatus = xSubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) ); } if( demoStatus == pdPASS ) { demoStatus = xSubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_REJECTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) ); } /* This demo uses a constant #THING_NAME known at compile time * therefore we can use macros to assemble shadow topic strings. * If the thing name is known at run time, then we could use the API * #Shadow_GetTopicString to assemble shadow topic strings, here is * the example for /update/delta: * * For /update/delta: * * #define SHADOW_TOPIC_MAX_LENGTH (256U) * * ShadowStatus_t shadowStatus = SHADOW_STATUS_SUCCESS; * char cTopicBuffer[ SHADOW_TOPIC_MAX_LENGTH ] = { 0 }; * uint16_t usBufferSize = SHADOW_TOPIC_MAX_LENGTH; * uint16_t usOutLength = 0; * const char * pcThingName = "TestThingName"; * uint16_t usThingNameLength = ( sizeof( pcThingName ) - 1U ); * * shadowStatus = Shadow_GetTopicString( * SHADOW_TOPIC_STRING_TYPE_UPDATE_DELTA, * pcThingName, * usThingNameLength, * & ( cTopicBuffer[ 0 ] ), * usBufferSize, * & usOutLength ); */ /* Then we publish a desired state to the /update topic. Since we've * deleted the device shadow at the beginning of the demo, this will * cause a delta message to be published, which we have subscribed to. * In many real applications, the desired state is not published by * the device itself. But for the purpose of making this demo * self-contained, we publish one here so that we can receive a delta * message later. */ if( demoStatus == pdPASS ) { /* Desired power on state . */ LogInfo( ( "Send desired power state with 1." ) ); ( void ) memset( pcUpdateDocument, 0x00, sizeof( pcUpdateDocument ) ); snprintf( pcUpdateDocument, SHADOW_DESIRED_JSON_LENGTH + 1, SHADOW_DESIRED_JSON, ( int ) 1, ( long unsigned ) ( xTaskGetTickCount() % 1000000 ) ); demoStatus = xPublishToTopic( SHADOW_TOPIC_STRING_UPDATE( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ), pcUpdateDocument, ( SHADOW_DESIRED_JSON_LENGTH + 1 ) ); } if( demoStatus == pdPASS ) { /* Note that PublishToTopic already called MQTT_ProcessLoop, * therefore responses may have been received and the * prvEventCallback may have been called, which may have changed * the stateChanged flag. Check if the state change flag has been * modified or not. If it's modified, then we publish reported * state to update topic. */ if( stateChanged == true ) { /* Report the latest power state back to device shadow. */ LogInfo( ( "Report to the state change: %d", ulCurrentPowerOnState ) ); ( void ) memset( pcUpdateDocument, 0x00, sizeof( pcUpdateDocument ) ); /* Keep the client token in global variable used to compare if * the same token in /update/accepted. */ ulClientToken = ( xTaskGetTickCount() % 1000000 ); snprintf( pcUpdateDocument, SHADOW_REPORTED_JSON_LENGTH + 1, SHADOW_REPORTED_JSON, ( int ) ulCurrentPowerOnState, ( long unsigned ) ulClientToken ); demoStatus = xPublishToTopic( SHADOW_TOPIC_STRING_UPDATE( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ), pcUpdateDocument, ( SHADOW_DESIRED_JSON_LENGTH + 1 ) ); } else { LogInfo( ( "No change from /update/delta, unsubscribe all shadow topics and disconnect from MQTT.\r\n" ) ); } } if( demoStatus == pdPASS ) { LogInfo( ( "Start to unsubscribe shadow topics and disconnect from MQTT. \r\n" ) ); demoStatus = xUnsubscribeFromTopic( SHADOW_TOPIC_STRING_UPDATE_DELTA( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) ); if( demoStatus != pdPASS ) { LogError( ( "Failed to unsubscribe the topic %s", SHADOW_TOPIC_STRING_UPDATE_DELTA( THING_NAME ) ) ); } } if( demoStatus == pdPASS ) { demoStatus = xUnsubscribeFromTopic( SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) ); if( demoStatus != pdPASS ) { LogError( ( "Failed to unsubscribe the topic %s", SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( THING_NAME ) ) ); } } if( demoStatus == pdPASS ) { demoStatus = xUnsubscribeFromTopic( SHADOW_TOPIC_STRING_UPDATE_REJECTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) ); if( demoStatus != pdPASS ) { LogError( ( "Failed to unsubscribe the topic %s", SHADOW_TOPIC_STRING_UPDATE_REJECTED( THING_NAME ) ) ); } } /* The MQTT session is always disconnected, even there were prior * failures. */ demoStatus = xDisconnectMqttSession(); /* This demo performs only Device Shadow operations. If matching the * Shadow MQTT topic fails or there are failure in parsing the * received JSON document, then this demo was not successful. */ if( ( xUpdateAcceptedReturn != pdPASS ) || ( xUpdateDeltaReturn != pdPASS ) ) { LogError( ( "Callback function failed." ) ); } if( demoStatus == pdPASS ) { LogInfo( ( "Demo completed successfully." ) ); } else { LogError( ( "Shadow Demo failed." ) ); } } /* Delete this task. */ LogInfo( ( "Deleting Shadow Demo task." ) ); vTaskDelete( NULL ); }

以下屏幕截图显示了演示成功时的预期输出。


                显示成功的影子演示终端输出

连接到 AWS IoT MQTT 代理

要连接到 AWS IoT MQTT 代理,我们使用与 MQTT_Connect() 中的 coreMQTT 双向身份验证演示 相同的方法。

删除影子文档

要删除影子文档,请使用 xPublishToTopic Device Shadow 库定义的宏调用带空消息的 AWS IoT。这将使用 MQTT_Publish 发布到 /delete 主题。以下代码部分说明如何在函数 prvShadowDemoTask 中完成此操作。

/* First of all, try to delete any Shadow document in the cloud. */ returnStatus = PublishToTopic( SHADOW_TOPIC_STRING_DELETE( THING_NAME ), SHADOW_TOPIC_LENGTH_DELETE( THING_NAME_LENGTH ), pcUpdateDocument, 0U );

订阅影子主题

订阅 Device Shadow 主题以接收来自 AWS IoT 代理有关影子更改的通知。Device Shadow 主题组合在 Device Shadow 库中定义的宏进行组合。以下代码部分说明如何在 prvShadowDemoTask 函数中完成此操作。

/* Then try to subscribe shadow topics. */ if( returnStatus == EXIT_SUCCESS ) { returnStatus = SubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_DELTA( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) ); } if( returnStatus == EXIT_SUCCESS ) { returnStatus = SubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) ); } if( returnStatus == EXIT_SUCCESS ) { returnStatus = SubscribeToTopic( SHADOW_TOPIC_STRING_UPDATE_REJECTED( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) ); }

发送影子更新

要发送影子更新,演示会使用 Device Shadow 库定义的宏通过 JSON 格式的消息调用 xPublishToTopic。这将使用 MQTT_Publish 发布到 /delete 主题。以下代码部分说明如何在 prvShadowDemoTask 函数中完成此操作。

#define SHADOW_REPORTED_JSON \ "{" \ "\"state\":{" \ "\"reported\":{" \ "\"powerOn\":%01d" \ "}" \ "}," \ "\"clientToken\":\"%06lu\"" \ "}" snprintf( pcUpdateDocument, SHADOW_REPORTED_JSON_LENGTH + 1, SHADOW_REPORTED_JSON, ( int ) ulCurrentPowerOnState, ( long unsigned ) ulClientToken ); xPublishToTopic( SHADOW_TOPIC_STRING_UPDATE( THING_NAME ), SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ), pcUpdateDocument, ( SHADOW_DESIRED_JSON_LENGTH + 1 ) );

处理影子增量消息和影子更新消息

使用 函数注册到 coreMQTT 客户端库的用户回调函数将通知我们有关传入数据包事件的信息。MQTT_Init以下是回调函数。

/* This is the callback function invoked by the MQTT stack when it * receives incoming messages. This function demonstrates how to use the * Shadow_MatchTopic function to determine whether the incoming message is * a device shadow message or not. If it is, it handles the message * depending on the message type. */ static void prvEventCallback( MQTTContext_t * pxMqttContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) { ShadowMessageType_t messageType = ShadowMessageTypeMaxNum; const char * pcThingName = NULL; uint16_t usThingNameLength = 0U; uint16_t usPacketIdentifier; ( void ) pxMqttContext; configASSERT( pxDeserializedInfo != NULL ); configASSERT( pxMqttContext != NULL ); configASSERT( pxPacketInfo != NULL ); usPacketIdentifier = pxDeserializedInfo->packetIdentifier; /* Handle incoming publish. The lower 4 bits of the publish packet * type is used for the dup, QoS, and retain flags. Hence masking * out the lower bits to check if the packet is publish. */ if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { configASSERT( pxDeserializedInfo->pPublishInfo != NULL ); LogInfo( ( "pPublishInfo->pTopicName:%s.", pxDeserializedInfo->pPublishInfo->pTopicName ) ); /* Let the Device Shadow library tell us whether this is a device * shadow message. */ if( SHADOW_SUCCESS == Shadow_MatchTopic( pxDeserializedInfo->pPublishInfo->pTopicName, pxDeserializedInfo->pPublishInfo->topicNameLength, &messageType, &pcThingName, &usThingNameLength ) ) { /* Upon successful return, the messageType has been filled in. */ if( messageType == ShadowMessageTypeUpdateDelta ) { /* Handler function to process payload. */ prvUpdateDeltaHandler( pxDeserializedInfo->pPublishInfo ); } else if( messageType == ShadowMessageTypeUpdateAccepted ) { /* Handler function to process payload. */ prvUpdateAcceptedHandler( pxDeserializedInfo->pPublishInfo ); } else if( messageType == ShadowMessageTypeUpdateDocuments ) { LogInfo( ( "/update/documents json payload:%s.", ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) ); } else if( messageType == ShadowMessageTypeUpdateRejected ) { LogInfo( ( "/update/rejected json payload:%s.", ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) ); } else { LogInfo( ( "Other message type:%d !!", messageType ) ); } } else { LogError( ( "Shadow_MatchTopic parse failed:%s !!", ( const char * ) pxDeserializedInfo->pPublishInfo->pTopicName ) ); } } else { vHandleOtherIncomingPacket( pxPacketInfo, usPacketIdentifier ); } }

回调函数确认传入数据包的类型是 MQTT_PACKET_TYPE_PUBLISH,并使用 Device Shadow Library API Shadow_MatchTopic 确认传入消息是影子消息。

如果传入消息是类型为 ShadowMessageTypeUpdateDelta 的影子消息,则我们将调用 prvUpdateDeltaHandler 来处理此消息。处理程序 prvUpdateDeltaHandler 使用 coreJSON 库解析消息以获取 powerOn 状态的增量值,并将此值与在本地维护的当前设备状态进行比较。如果它们不同,则会更新本地设备状态,以反映影子文档中 powerOn 状态的新值。

static void prvUpdateDeltaHandler( MQTTPublishInfo_t * pxPublishInfo ) { static uint32_t ulCurrentVersion = 0; /* Remember the latestVersion # we've ever received */ uint32_t ulVersion = 0U; uint32_t ulNewState = 0U; char * pcOutValue = NULL; uint32_t ulOutValueLength = 0U; JSONStatus_t result = JSONSuccess; configASSERT( pxPublishInfo != NULL ); configASSERT( pxPublishInfo->pPayload != NULL ); LogInfo( ( "/update/delta json payload:%s.", ( const char * ) pxPublishInfo->pPayload ) ); /* The payload will look similar to this: * { * "version": 12, * "timestamp": 1595437367, * "state": { * "powerOn": 1 * }, * "metadata": { * "powerOn": { * "timestamp": 1595437367 * } * }, * "clientToken": "388062" * } */ /* Make sure the payload is a valid json document. */ result = JSON_Validate( pxPublishInfo->pPayload, pxPublishInfo->payloadLength ); if( result == JSONSuccess ) { /* Then we start to get the version value by JSON keyword "version". */ result = JSON_Search( ( char * ) pxPublishInfo->pPayload, pxPublishInfo->payloadLength, "version", sizeof( "version" ) - 1, '.', &pcOutValue, ( size_t * ) &ulOutValueLength ); } else { LogError( ( "The json document is invalid!!" ) ); } if( result == JSONSuccess ) { LogInfo( ( "version: %.*s", ulOutValueLength, pcOutValue ) ); /* Convert the extracted value to an unsigned integer value. */ ulVersion = ( uint32_t ) strtoul( pcOutValue, NULL, 10 ); } else { LogError( ( "No version in json document!!" ) ); } LogInfo( ( "version:%d, ulCurrentVersion:%d \r\n", ulVersion, ulCurrentVersion ) ); /* When the version is much newer than the on we retained, that means * the powerOn state is valid for us. */ if( ulVersion > ulCurrentVersion ) { /* Set to received version as the current version. */ ulCurrentVersion = ulVersion; /* Get powerOn state from json documents. */ result = JSON_Search( ( char * ) pxPublishInfo->pPayload, pxPublishInfo->payloadLength, "state.powerOn", sizeof( "state.powerOn" ) - 1, '.', &pcOutValue, ( size_t * ) &ulOutValueLength ); } else { /* In this demo, we discard the incoming message * if the version number is not newer than the latest * that we've received before. Your application may use a * different approach. */ LogWarn( ( "The received version is smaller than current one!!" ) ); } if( result == JSONSuccess ) { /* Convert the powerOn state value to an unsigned integer value. */ ulNewState = ( uint32_t ) strtoul( pcOutValue, NULL, 10 ); LogInfo( ( "The new power on state newState:%d, ulCurrentPowerOnState:%d \r\n", ulNewState, ulCurrentPowerOnState ) ); if( ulNewState != ulCurrentPowerOnState ) { /* The received powerOn state is different from the one we * retained before, so we switch them and set the flag. */ ulCurrentPowerOnState = ulNewState; /* State change will be handled in main(), where we will publish * a "reported" state to the device shadow. We do not do it here * because we are inside of a callback from the MQTT library, so * that we don't re-enter the MQTT library. */ stateChanged = true; } } else { LogError( ( "No powerOn in json document!!" ) ); xUpdateDeltaReturn = pdFAIL; } }

如果传入消息是类型为 ShadowMessageTypeUpdateAccepted 的影子消息,则我们将调用 prvUpdateAcceptedHandler 来处理此消息。处理程序 prvUpdateAcceptedHandler 使用 coreJSON 库解析消息以从消息中获取 clientToken。此处理程序函数检查 JSON 消息中的客户端令牌是否与应用程序使用的客户端令牌匹配。如果不匹配,该函数将记录警告消息。

static void prvUpdateAcceptedHandler( MQTTPublishInfo_t * pxPublishInfo ) { char * pcOutValue = NULL; uint32_t ulOutValueLength = 0U; uint32_t ulReceivedToken = 0U; JSONStatus_t result = JSONSuccess; assert( pxPublishInfo != NULL ); assert( pxPublishInfo->pPayload != NULL ); LogInfo( ( "/update/accepted json payload:%s.", ( const char * ) pxPublishInfo->pPayload ) ); /* Handle the reported state with state change in /update/accepted topic. * Thus we will retrieve the client token from the json document to see if * it's the same one we sent with reported state on the /update topic. * The payload will look similar to this: * { * "state": { * "reported": { * "powerOn": 1 * } * }, * "metadata": { * "reported": { * "powerOn": { * "timestamp": 1596573647 * } * } * }, * "version": 14698, * "timestamp": 1596573647, * "clientToken": "022485" * } */ /* Make sure the payload is a valid json document. */ result = JSON_Validate( pxPublishInfo->pPayload, pxPublishInfo->payloadLength ); if( result == JSONSuccess ) { /* Get clientToken from json documents. */ result = JSON_Search( ( char * ) pxPublishInfo->pPayload, pxPublishInfo->payloadLength, "clientToken", sizeof( "clientToken" ) - 1, '.', &pcOutValue, ( size_t * ) &ulOutValueLength ); } else { LogError( ( "Invalid json documents !!" ) ); } if( result == JSONSuccess ) { LogInfo( ( "clientToken: %.*s", ulOutValueLength, pcOutValue ) ); /* Convert the code to an unsigned integer value. */ ulReceivedToken = ( uint32_t ) strtoul( pcOutValue, NULL, 10 ); LogInfo( ( "receivedToken:%d, clientToken:%u \r\n", ulReceivedToken, ulClientToken ) ); /* If the clientToken in this update/accepted message matches the one * we published before, it means the device shadow has accepted our * latest reported state. We are done. */ if( ulReceivedToken == ulClientToken ) { LogInfo( ( "Received response from the device shadow. Previously published " "update with clientToken=%u has been accepted. ", ulClientToken ) ); } else { LogWarn( ( "The received clientToken=%u is not identical with the one=%u we sent ", ulReceivedToken, ulClientToken ) ); } } else { LogError( ( "No clientToken in json document!!" ) ); lUpdateAcceptedReturn = EXIT_FAILURE; } }