coreMQTT 代理连接共享演示 - FreeRTOS
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

coreMQTT 代理连接共享演示

重要

该演示托管在已弃用的 Amazon-FreeRTOS 存储库中。当您创建新项目时,我们建议从此处开始。如果您已经有一个基于现已弃用的 Amazon-FreeRTOS 存储库的 FreeRTOS 项目,请参阅 Amazon-FreeRTOS Github 存储库迁移指南

简介

coreMQTT 连接共享演示项目展示了如何使用在客户端与服务器之间具有双向身份验证的 TLS,通过多线程应用程序建立与 Amazon MQTT 代理的连接。此演示使用基于 mbedTLS 的传输接口实现来建立经过服务器和客户端身份验证的 TLS 连接,并演示了 QoS 1 级别的 MQTT 订阅发布工作流程。该演示订阅主题筛选条件,发布与筛选条件匹配的主题,然后等待从 QoS 1 级别的服务器接收这些消息。对于每个创建的任务,这种向代理发布消息并从代理接收相同消息的循环过程会无限期地重复。本演示中的消息按照 QoS 1 发送,这样可以保证至少有一次按照 MQTT 规范传递。

注意

要设置和运行 FreeRTOS 演示,请按照开始使用 FreeRTOS中的步骤操作。

此演示使用线程安全队列来保存与 MQTT API 交互的命令。此演示中有两个任务需要注意。

  • MQTT 代理(主)任务处理命令队列中的命令,而其他任务则将它们排入队列。此任务会进入循环,它会在此期间处理命令队列中的命令。如果收到终止命令,则此任务将退出循环。

  • 演示 subpub 任务会创建对 MQTT 主题的订阅,然后创建发布操作并将其推送到命令队列中。然后,这些发布操作由 MQTT 代理任务运行。演示 subpub 任务会等待发布完成(通过执行命令完成回调来指示),然后在开始下一次发布之前进入短暂的延迟。此任务展示了应用程序任务如何使用 coreMQTT 代理 API 的示例。

对于传入的发布消息,coreMQTT 代理会调用单个回调函数。此演示还包括一个订阅管理器,它允许任务指定一个回调,以便为它们已订阅的主题的传入发布消息进行调用。在此演示中,代理的传入发布回调会调用订阅管理器,将发布分散到任何已注册订阅的任务中。

此演示使用具有双向身份验证的 TLS 连接到 Amazon。如果网络在演示过程中意外断开连接,则客户端会尝试使用指数回退逻辑重新连接。如果客户端重新连接成功,但代理无法恢复之前的会话,则客户端将重新订阅与上一个会话相同的主题。

单线程与多线程

coreMQTT 有两种使用模式,即单线程和多线程(多任务处理)。单线程模型使用来自仅一个线程的 coreMQTT 库,并要求您在 MQTT 库中重复进行显式调用。多线程使用案例可以在代理(或进程守护程序)任务的后台运行 MQTT 协议,如此处记录的演示所示。在代理任务中运行 MQTT 协议时,无需显式管理任何 MQTT 状态或调用 MQTT_ProcessLoop API 函数。此外,如果您使用代理任务,则多个应用程序任务可共享单个 MQTT 连接,而无需同步原语(例如,互斥锁)。

源代码

演示源文件名为 mqtt_agent_task.csimple_sub_pub_demo.c,可在 freertos/demos/coreMQTT_Agent/ 目录中和 GitHub 网站上找到。

功能

此演示至少创建两个任务:一个用于处理 MQTT API 调用请求的主要任务,以及用于创建这些请求的可配置子任务数量。在此演示中,主任务会创建子任务,调用处理循环,然后进行清理。主任务创建与代理的单个 MQTT 连接,该连接在子任务之间共享。子任务通过代理创建 MQTT 订阅,然后向其发布消息。每个子任务的发布都使用唯一的主题。

主任务

主应用程序任务 RunCoreMQTTAgentDemo 会建立一个 MQTT 会话,创建子任务,并运行处理循环 MQTTAgent_CommandLoop,直到收到终止命令。如果网络意外断开连接,演示将在后台重新连接到代理,并与代理重新建立订阅。处理循环终止后,它会断开与代理的连接。

命令

当您调用 coreMQTT 代理 API 时,它会创建一个发送到代理任务队列的命令,然后在 MQTTAgent_CommandLoop() 中进行处理。在创建命令时,可以传递可选的完成回调和上下文参数。相应的命令完成后,将使用传递的上下文以及作为命令的结果创建的任何返回值来调用完成回调。完成回调的签名如下:

typedef void (* MQTTAgentCommandCallback_t )( void * pCmdCallbackContext, MQTTAgentReturnInfo_t * pReturnInfo );

命令完成上下文是用户定义的;在该演示中为:struct MQTTAgentCommandContext

在以下情况下即认为命令已完成:

  • 订阅、取消订阅和发布 (QoS > 0):收到相应的确认数据包后。

  • 所有其他操作:调用相应的 coreMQTT API 后。

命令使用的任何结构(包括发布信息、订阅信息和完成上下文)都必须保持在作用范围内,直到命令完成。在调用完成回调之前,调用任务不得重用命令的任何结构。请注意,由于完成回调由 MQTT 代理调用,因此,它将使用代理任务的线程上下文运行,而不是创建命令的任务。进程间通信机制(例如,任务通知或队列)可用于向调用任务发出命令完成的信号。

运行命令循环

命令在MQTTAgent_CommandLoop() 中持续处理。如果没有要处理的命令,则循环将等待向队列中添加一个命令的最长等待时间 MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME;如果未添加任何命令,则循环将运行 MQTT_ProcessLoop() 的一次迭代。这样既可确保管理 MQTT Keep-Alive,又可确保即使队列中没有命令,也能收到传入的发布。

在以下原因下,命令循环函数将返回:

  • 命令返回除 MQTTSuccess 之外的任何状态代码。命令循环返回错误状态,因此您可以决定如何处理。在此演示中,重新建立了 TCP 连接并尝试重新连接。如果出现任何错误,则可以在后台进行重新连接,而没有其他使用 MQTT 的任务产生的任何干预。

  • 处理了断开连接命令 (MQTTAgent_Disconnect)。退出了命令循环,以便断开 TCP 连接。

  • 处理了终止命令 (MQTTAgent_Terminate)。此命令还将任何仍在队列中或正在等待确认数据包的命令标记为错误,返回代码为 MQTTRecvFailed

订阅管理器

由于该演示使用多个主题,因此,订阅管理器是将订阅的主题与唯一的回调或任务相关联的便捷方式。此演示中的订阅管理器是单线程的,因此不应同时用于多个任务。在此演示中,订阅管理器函数只能从传递给 MQTT 代理的回调函数中调用,并且只能在代理任务的线程上下文中运行。

简单的订阅-发布任务

prvSimpleSubscribePublishTask 的每个实例都会创建对 MQTT 主题的订阅,并为该主题创建发布操作。为了演示多种发布类型,偶数编号的任务使用 QoS 0(发送发布数据包后即完成),奇数任务使用 QoS 1(收到 PUBACK 数据包后即完成)。