解决 RabbitMQ 暂停队列同步的问题 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

解决 RabbitMQ 暂停队列同步的问题

在 Amazon MQ for RabbitMQ 集群部署中,发布到每个队列的消息跨三个代理节点进行复制。这种复制称为镜像,为 RabbitMQ 代理提供高可用性(HA)。集群部署中的队列由一个节点上的副本和一个或多个镜像组成。应用于镜像队列的每个操作(包括入队消息)首先应用于主队列,然后在其镜像之间进行复制。

例如,假设一个跨三个节点复制的镜像队列:主节点(main)和两个镜像(mirror-1mirror-2)。如果此镜像队列中的所有消息都成功传播到所有镜像,则队列将同步。如果某个节点(mirror-1)在一段时间内变得不可用,则该队列仍可运行并可继续将消息排入队列。但是,对于要同步的队列,在 mirror-1 不可用时发布到 main 的消息必须复制到 mirror-1

有关镜像的更多信息,请参阅 RabbitMQ 网站上的经典镜像队列

维护和队列同步

维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store(Amazon EBS)卷加载到内存中,以进行批处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较 CloudWatch 中的 RabbitMemUsedRabbitMqMemLimit 代理节点指标来确认内存使用情况。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。

注意

减少队列同步批处理大小可能会导致更多的复制事务。

要解决暂停的队列同步,请按照本教程中的步骤操作,其中演示了应用 ha-sync-batch-size 策略和重新启动队列同步的过程。

先决条件

在本教程中,您必须拥有具有管理员权限的 Amazon MQ for RabbitMQ 代理用户。您可以使用第一次创建代理时创建的管理员用户,也可以使用之后可能创建的其他用户。下表提供了作为正则表达式(正则表达式)模式所需的管理员用户标签和权限。

标签 读取正则表达式 配置正则表达式 写入正则表达式
administrator .* .* .*

有关创建 RabbitMQ 用户和管理用户标记和权限的更多信息,请参阅适用于 RabbitMQ 经纪商用户的亚马逊 MQ

步骤 1:应用 ha-sync-batch-size 策略

以下过程演示了如何添加适用于代理上所创建的所有队列的策略。您可以使用 RabbitMQ Web 控制台或 RabbitMQ 管理 API。有关更多信息,请参阅 RabbitMQ 网站上的管理插件

使用 RabbitMQ Web 控制台应用 ha-sync-batch-size 策略
  1. 登录 Amazon MQ 控制台

  2. 在左侧导航窗格中,选择 Brokers (代理)

  3. 从代理列表中,选择要向其应用新策略的代理的名称。

  4. 在代理页面的 Connections (连接) 部分,选择 RabbitMQ web console (RabbitMQ Web 控制台) URL。RabbitMQ Web 控制台可在新的浏览器选项卡或窗口中打开。

  5. 使用您的代理程序管理员的登录凭证登录 RabbitMQ Web 控制台。

  6. 在 RabbitMQ Web 控制台页面顶部选择 Admin (管理员)

  7. Admin (管理员) 页面的右侧导航窗格中,选择 Policies (策略)

  8. Policies (策略) 页面上,您可以看到代理现有的 User policies (用户策略) 列表。在 User policies (用户策略) 下,展开 Add / update a policy (添加/更新策略)

    注意

    默认情况下,Amazon MQ for RabbitMQ 集群是使用名为 ha-all-AWS-OWNED-DO-NOT-DELETE 的初始代理策略创建的。Amazon MQ 管理此策略,以确保代理上的每个队列都复制到所有三个节点,并且队列自动同步。

  9. 要创建新的代理策略,请在 Add / update a policy (添加/更新策略) 下,执行以下操作:

    1. 对于 Name (名称),请为您的策略输入名称,例如 batch-size-policy

    2. Pattern (模式) 中,输入正则表达式模式 .*,以便策略匹配代理上的所有队列。

    3. 对于 Apply to (应用于),从下拉列表中选择 Exchanges and queues (交换器和队列)

    4. 对于 Priority (优先级),输入一个大于应用于虚拟主机的所有其他策略的整数。您可以在任何给定时间将一组策略定义应用于 RabbitMQ 队列和交换器。RabbitMQ 选择具有最高优先级值的匹配策略。有关策略优先级以及如何组合策略的更多信息,请参阅 RabbitMQ 服务器文档中的策略

    5. 对于 Definition (定义),添加以下键/值对:

      • ha-sync-batch-size=100。从下拉列表中选择 Number (编号)

        注意

        您可能需要根据队列中未同步消息的数量和大小调整和校准 ha-sync-batch-size 的值。

      • ha-mode=all。从下拉列表中选择 String (字符串)

        重要

        ha-mode 定义是所有与 HA 相关的策略所必需的。忽略它会导致验证失败。

      • ha-sync-mode=automatic。从下拉列表中选择 String (字符串)

        注意

        ha-sync-mode 定义是所有自定义策略所必需的。如果省略该定义,Amazon MQ 会自动追加定义。

    6. 选择 Add / update policy (添加/更新策略)

  10. 确认 User policies (用户策略) 列表中显示新策略。

使用 RabbitMQ 管理 API 应用 ha-sync-batch-size 策略
  1. 登录 Amazon MQ 控制台

  2. 在左侧导航窗格中,选择 Brokers (代理)

  3. 从代理列表中,选择要向其应用新策略的代理的名称。

  4. 在代理页面的 Connections (连接) 部分,记下 RabbitMQ web console (RabbitMQ Web 控制台) URL。这是您在 HTTP 请求中使用的代理终端节点。

  5. 打开您选择的新终端或命令行窗口。

  6. 要创建新的代理策略,请输入以下 curl 命令。此命令假定默认 / 虚拟主机上有一个队列,该队列编码为 %2F

    注意

    usernamepassword 替换为代理程序管理员登录凭证。您可能需要根据队列中未同步消息的数量和大小调整和校准 ha-sync-batch-size 的值(100)。将代理终端节点替换为您之前记下的 URL。

    curl -i -u username:password -H "content-type:application/json" -XPUT \ -d '{"pattern":".*", "priority":1, "definition":{"ha-sync-batch-size":100, "ha-mode":"all", "ha-sync-mode":"automatic"}}' \ https://b-589c045f-f8ln-4ab0-a89c-co62e1c32ef8.mq.us-west-2.amazonaws.com/api/policies/%2F/batch-size-policy
  7. 要确认新策略已添加到您的代理的用户策略中,请输入以下 curl 命令以列出所有代理策略。

    curl -i -u username:password https://b-589c045f-f8ln-4ab0-a89c-co62e1c32ef8.mq.us-west-2.amazonaws.com/api/policies

步骤 2:重新启动队列同步

将新的 ha-sync-batch-size 策略应用于您的代理后,重新启动队列同步。

使用 RabbitMQ Web 控制台重新启动队列同步
注意

要打开 RabbitMQ Web 控制台,请参阅本教程第 1 步中的先前说明。

  1. 在 RabbitMQ Web 控制台页面顶部选择 Queues (队列)

  2. Queues (队列) 页面上的 All queues (所有队列) 下,找到已暂停的队列。在 Features (功能) 列中,您的队列应列出您创建的新策略的名称(例如 batch-size-policy)。

  3. 要重新启动具有较小批处理大小的同步过程,请选择 Restart sync (重新启动同步)

注意

如果同步暂停并且未成功完成,请尝试减少 ha-sync-batch-size 值并重新启动队列同步。

后续步骤

  • 队列成功同步后,您可以通过查看 Amazon CloudWatch 指标 RabbitMQMemUsed 来监控 RabbitMQ 节点使用的内存量。您还可以查看 RabbitMQMemLimit 指标以监控节点的内存限制。有关更多信息,请参阅访问 Amazon MQ 的 CloudWatch 指标适用于 RabbitMQ 经纪商的亚马逊 MQ 可用 CloudWatch 指标

  • 为防止队列同步暂停,建议保持队列较短并处理消息。对于消息较大的工作负载,我们还建议您将代理实例类型升级到具有更多内存的更大实例大小。有关代理实例类型和编辑代理首选项的更多信息,请参阅Amazon MQ for RabbitMQ 实例类型编辑代理首选项

  • 当您创建新的 Amazon MQ for RabbitMQ 时,Amazon MQ 会应用一组默认策略和虚拟主机限制来优化代理性能。如果您的代理没有建议的默认策略和限制,建议您自己创建。有关创建默认策略和虚拟主机限制的更多信息,请参阅Amazon MQ for RabbitMQ 代理默认值

  • UpdateBrokerInput – 使用此代理属性通过 Amazon MQ API 更新代理实例类型。

  • 参数和策略(RabbitMQ 服务器文档)– 在 RabbitMQ 网站上了解有关 RabbitMQ 参数和策略的更多信息。

  • RabbitMQ 管理 HTTP API – 了解有关 RabbitMQ 管理 API 的更多信息。