coreMQTT 双向身份验证演示 - FreeRTOS
AWS 文档中描述的 AWS 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 AWS 服务入门

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

coreMQTT 双向身份验证演示

Introduction

(双向身份验证)演示项目说明如何将 TLS 与客户端和服务器之间的双向身份验证结合使用来建立与 MQTT 代理的连接。coreMQTT此演示使用基于 mbedTLS 的传输接口实施来建立服务器和客户端验证的 TLS 连接,并演示 MQTT 的订阅-发布工作流程(位于 QoS 1 级别)。订阅单个主题筛选条件后,它会发布到同一主题并等待从 QoS 1 级别的服务器接收该消息。发布到代理并从代理返回接收相同消息的这一周期将无限期地重复。本演示中的消息以 QoS 1 发送,保证根据 MQTT 规范至少传送一次。

注意

要设置和运行 FreeRTOS 演示,请按照开始使用 FreeRTOS中的步骤操作。

Functionality

该演示创建单个应用程序任务,循环访问一组示例,这些示例演示如何连接到代理,订阅代理上的主题,发布到代理上的主题,最后断开与代理的连接。演示应用程序将订阅并发布到同一主题。每当演示向 MQTT 代理发布消息时,代理就会将相同消息发送回演示应用程序。

成功完成演示将生成类似于下图的输出。


                成功完成时的 MQTT 演示终端输出

控制台将生成类似于下图的输出。AWS IoT


                成功完成时的 MQTT 演示控制台输出

以下是演示的结构。

static void prvMQTTDemoTask( void * pvParameters ) { uint32_t ulPublishCount = 0U, ulTopicCount = 0U; const uint32_t ulMaxPublishCount = 5UL; NetworkContext_t xNetworkContext = { 0 }; NetworkCredentials_t xNetworkCredentials = { 0 }; MQTTContext_t xMQTTContext = { 0 }; MQTTStatus_t xMQTTStatus; TlsTransportStatus_t xNetworkStatus; /* Remove compiler warnings about unused parameters. */ ( void ) pvParameters; /* Set the entry time of the demo application. This entry time will be used * to calculate relative time elapsed in the execution of the demo application, * by the timer utility function that is provided to the MQTT library. */ ulGlobalEntryTimeMs = prvGetTimeMs(); for( ; ; ) { /****************************** Connect. ******************************/ /* Attempt to establish TLS session with MQTT broker. If connection fails, * retry after a timeout. Timeout value will be exponentially increased * until the maximum number of attempts are reached or the maximum timeout * value is reached. The function returns a failure status if the TCP * connection cannot be established to the broker after the configured * number of attempts. */ xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials, &xNetworkContext ); configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS ); /* Sends an MQTT Connect packet over the already established TLS connection, * and waits for connection acknowledgment (CONNACK) packet. */ LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); /**************************** Subscribe. ******************************/ /* If server rejected the subscription request, attempt to resubscribe to * topic. Attempts are made according to the exponential backoff retry * strategy implemented in retryUtils. */ prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); /****************** Publish and Keep Alive Loop. **********************/ /* Publish messages with QoS1, send and process Keep alive messages. */ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) { LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); prvMQTTPublishToTopic( &xMQTTContext ); /* Process incoming publish echo, since application subscribed to the * same topic, the broker will send publish message back to the * application. */ LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Leave Connection Idle for some time. */ LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ); } /******************** Unsubscribe from the topic. *********************/ LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); prvMQTTUnsubscribeFromTopic( &xMQTTContext ); /* Process incoming UNSUBACK packet from the broker. */ xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /**************************** Disconnect. *****************************/ /* Send an MQTT Disconnect packet over the already connected TLS over * TCP connection. There is no corresponding response for the disconnect * packet. After sending disconnect, client must close the network * connection. */ LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Close the network connection. */ TLS_FreeRTOS_Disconnect( &xNetworkContext ); /* Reset SUBACK status for each topic filter after completion of * subscription request cycle. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; } /* Wait for some time between two iterations to ensure that we do not * bombard the broker. */ LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. " "Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); LogInfo( ( "Demo completed successfully.\r\n" ) ); LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ); } }

使用指数退避和抖动重试逻辑

prvBackoffForRetry 函数显示如何通过指数回退和抖动重试与服务器之间的失败的网络操作,例如 TLS 连接或 MQTT 订阅请求。该函数计算下一次重试尝试的退避期间,并在重试尝试尚未用尽时执行退避延迟。由于退避周期的计算需要生成随机数,因此,该函数使用 PKCS11 模块生成随机数。使用 PKCS11 模块允许访问真正的随机数生成器 (TRNG) (如果供应商平台支持)。我们建议您使用特定于设备的纪元源为随机数生成器做种,以降低连接重试期间设备发生碰撞的可能性。

连接到 MQTT 代理

prvConnectToServerWithBackoffRetries 函数尝试与 MQTT 代理建立经相互身份验证的 TLS 连接。如果连接失败,它将在退避期限后重试。退避期间将以指数方式增加,直到达到最大尝试次数或达到最大退避期间。函数提供指数递增的退避值,并在达到最大尝试次数时返回 BackoffAlgorithm_GetNextBackoffRetryUtilsRetriesExhausted如果在配置的尝试次数后无法建立与代理的 TLS 连接,prvConnectToServerWithBackoffRetries 函数将返回失败状态。

函数演示如何使用干净会话建立与 MQTT 代理的 MQTT 连接。prvCreateMQTTConnectionWithBroker它使用在 FreeRTOS-Plus/Source/Application-Protocols/platform/freertos/transport/src/tls_freertos.c 文件中实现的 TLS 传输接口。函数的定义如下所示。prvCreateMQTTConnectionWithBroker请记住,我们正在为 xConnectInfo 中的代理设置保持活动秒数。

下一个函数显示如何使用 MQTT_Init 函数在 MQTT 上下文中设置 TLS 传输接口和时间函数。它还演示了如何设置事件回调函数指针 (prvEventCallback)。此回调用于报告传入消息。

static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ) { MQTTStatus_t xResult; MQTTConnectInfo_t xConnectInfo; bool xSessionPresent; TransportInterface_t xTransport; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Fill in Transport Interface send and receive function pointers. */ xTransport.pNetworkContext = pxNetworkContext; xTransport.send = TLS_FreeRTOS_send; xTransport.recv = TLS_FreeRTOS_recv; /* Initialize MQTT library. */ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); configASSERT( xResult == MQTTSuccess ); /* Some fields are not used in this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); /* Start with a clean session i.e. direct the MQTT broker to discard any * previous session data. Also, establishing a connection with clean session * will ensure that the broker does not store any data when this client * gets disconnected. */ xConnectInfo.cleanSession = true; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); /* Set MQTT keep-alive period. If the application does not send packets at * an interval less than the keep-alive period, the MQTT library will send * PINGREQ packets. */ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; /* Append metrics when connecting to the AWS IoT Core broker. */ #ifdef democonfigUSE_AWS_IOT_CORE_BROKER #ifdef democonfigCLIENT_USERNAME xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS; xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS ); xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #else xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH; /* Password for authentication is not used. */ xConnectInfo.pPassword = NULL; xConnectInfo.passwordLength = 0U; #endif #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ #ifdef democonfigCLIENT_USERNAME xConnectInfo.pUserName = democonfigCLIENT_USERNAME; xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME ); xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #endif /* ifdef democonfigCLIENT_USERNAME */ #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it * is passed as NULL. */ xResult = MQTT_Connect( pxMQTTContext, &xConnectInfo, NULL, mqttexampleCONNACK_RECV_TIMEOUT_MS, &xSessionPresent ); configASSERT( xResult == MQTTSuccess ); /* Successfully established and MQTT connection with the broker. */ LogInfo( ( "An MQTT connection is established with %s.", democonfigMQTT_BROKER_ENDPOINT ) ); }

订阅 MQTT 主题

prvMQTTSubscribeWithBackoffRetries 函数演示如何订阅 MQTT 代理上的主题筛选条件。该示例演示如何订阅一个主题筛选条件,但可以在同一订阅 API 调用中传递主题筛选条件列表以订阅多个主题筛选条件。此外,如果 MQTT 代理拒绝订阅请求,则订阅将针对 RETRY_MAX_ATTEMPTS 重试并返回指数退避。

发布到主题

函数演示如何发布到 MQTT 代理上的主题筛选条件。prvMQTTPublishToTopic函数的定义如下所示。

static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTPublishInfo_t xMQTTPublishInfo; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Some fields are not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); /* This demo uses QoS1. */ xMQTTPublishInfo.qos = MQTTQoS1; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); /* Get a unique packet id. */ usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */ xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); configASSERT( xResult == MQTTSuccess ); }

接收传入消息

应用程序在连接到代理之前注册事件回调函数,如前所述。函数调用 prvMQTTDemoTask 函数以接收传入的消息。MQTT_ProcessLoop在收到传入的 MQTT 消息时,它调用由应用程序注册的事件回调函数。函数是此类事件回调函数的示例。prvEventCallbackprvEventCallback 检查传入数据包类型并调用相应的处理程序。在以下示例中,函数调用 prvMQTTProcessIncomingPublish() 来处理传入发布消息,或调用 prvMQTTProcessResponse() 来处理确认 (ACK)。

static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } }

处理传入 MQTT 发布数据包

函数演示如何处理来自 MQTT 代理的发布数据包。prvMQTTProcessIncomingPublish下面是函数的定义。

static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) { configASSERT( pxPublishInfo != NULL ); /* Process incoming Publish. */ LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) ); /* Verify the received publish is for the we have subscribed to. */ if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) { LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" "Incoming Publish Message : %.*s\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName, pxPublishInfo->payloadLength, pxPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); } }

取消订阅主题

工作流程中的最后一个步骤是取消订阅主题,以便代理不会从 mqttexampleTOPIC 发送任何发布的消息。 下面是函数的定义。

static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); /* Get next unique packet identifier. */ usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send UNSUBSCRIBE packet. */ xResult = MQTT_Unsubscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usUnsubscribePacketIdentifier ); configASSERT( xResult == MQTTSuccess ); }