Amazon MQ for RabbitMQ 最佳实践 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon MQ for RabbitMQ 最佳实践

以此作为参考快速找到在 Amazon MQ 上使用 RabbitMQ 代理最大程度提高性能和降低吞吐量成本的建议。

重要

目前,亚马逊 MQ 不支持 RabbitMQ 3.9.x 中JSON引入的直播或使用结构化登录。

重要

适用于 RabbitMQ 的 Amazon MQ 不支持用户名 “访客”,并且将在您创建新经纪人时删除默认访客账户。Amazon MQ 还将定期删除任何由客户创建的名为 “访客” 的账户。

开启自动次要版本升级

使用最新的代理版本的安全和错误修复以及性能改进。您可以启用 Amazon MQ 的自动次要版本升级,以管理最新补丁版本的升级。

使用已弃用的功能

如果你在亚马逊 MQ 上使用适用于 RabbitMQ 的 3.13 版本,你会在 RabbitMQ 管理用户界面中看到一条横幅,上面写着:Deprecated features are being used.

Navigation bar with Overview tab selected, showing Totals section header.

这是因为亚马逊 MQ 上的 RabbitMQ 使用了 RabbitMQ 上不再提供的以下功能,或者是自动为亚马逊 MQ 上的 RabbitMQ 配置的功能:

  • 经典队列镜像

  • 全球 QoS

  • 临时非独占队列

这是 3.13 版的信息横幅,无需任何操作。您的亚马逊 MQ 经纪人将继续使用这些功能。

选择正确的代理实例类型以获得最佳吞吐量

代理实例类型的消息吞吐量取决于您的应用程序用例。像这样的较小的代理实例类型t3.micro应仅用于测试应用程序性能。在生产环境中使用较大的实例之前使用这些微型实例可以提高应用程序性能并帮助您降低开发成本。对于实例类型m5.large及以上类型,您可以使用集群部署来实现高可用性和消息持久性。较大的代理实例类型可以处理客户端和队列的生产级别、高吞吐量、内存中的消息和冗余消息。有关选择正确实例类型的更多信息,请参阅规模调整指南。

使用多个频道

为避免连接流失,请在单个连接上使用多个频道。应用程序应避免 1:1 的连接与频道的比例。我们建议每个进程使用一个连接,然后每个线程使用一个通道。避免过度使用频道,以防止频道泄漏。

启用延迟队列

如果您正在处理处理大量消息的超长队列,则启用延迟队列可以提高代理性能。

RabbitMQ 的默认行为是在内存中缓存消息,并且仅在代理需要更多可用内存时将其移动到磁盘。将消息从内存移动到磁盘需要时间,并且会停止消息处理。懒人队列通过尽快将消息存储到磁盘来显著加快内存到磁盘的进程,从而减少内存中缓存的消息。

您可以通过在声明时设置 queue.declare 参数或通过 RabbitMQ 管理控制台配置策略来启用延迟队列。以下示例演示了如何使用 RabbitMQ Java 客户端库声明延迟队列。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);

默认情况下,3.12.13 及更高版本上的所有 Amazon MQ for RabbitMQ 队列都表现为延迟队列。要升级到最新版本的 Amazon MQ for RabbitMQ,请参阅。升级 Amazon MQ 代理引擎版本

注意

启用延迟队列会增加磁盘输入/输出操作。

使用永久消息和持久队列

持久消息有助于防止在代理崩溃或重启的情况下丢失数据。持久消息一到达就会立即写入磁盘。但是,与延迟队列不同的是,持久消息同时在内存和磁盘中缓存,除非代理需要更多内存。在需要更多内存的情况下,通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息,通常称为持久性层

要启用消息持久性,可以将队列声明为 durable 并将消息传递模式设置为 persistent。以下示例演示了如何使用 RabbitMQ Java 客户端库声明持续队列。使用 AMQP 0-9-1 时,您可以通过将传送模式设置为 “2” 来将邮件标记为永久邮件。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

将队列配置为持续队列后,您可以通过将 MessageProperties 设置为 PERSISTENT_TEXT_PLAIN 来将持久消息发送到您的队列,如以下示例所示。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

保持队列简短

在集群部署中,包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时,重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启,过度利用的代理可能会在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store (AmazonEBS) 卷加载到内存中,以进行批量处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较中的RabbitMemUsedRabbitMqMemLimit代理节点指标来确认内存使用情况 CloudWatch。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。

如果集群部署暂停队列同步,我们建议使用或删除消息,以减少队列中的消息数量。一旦队列深度减少且队列同步完成,代理状态将更改为 RUNNING。要解决暂停的队列同步,您还可以应用策略来减少队列同步批处理大小

您还可以定义自动删除和TTL策略,以主动减少资源使用量,并尽NACKs量减少对消费者的侵害。在代理上重新排队消息是CPU密集型的,因此大量消息NACKs可能会影响代理性能。

配置发布者确认和消费者送达确认

确认消息已发送给经纪人的过程称为发布者确认。发布者确认告知您的应用程序何时可靠地存储了消息。发布者确认还可以帮助控制存储到代理的消息速率。如果没有发布商的确认,则无法确认消息已成功处理,并且您的经纪人可能会丢弃它无法处理的消息。

同样,当客户端应用程序向代理发送消息的发送和使用确认时,它被称为消费者交付确认。在与RabbitMQ经纪人合作时,确认和确认对于确保数据安全至关重要。

使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时,可以通过配置方法来启用确认。basic.consumeAMQP0-9-1 客户端还可以通过发送方法来配置发布者确认。confirm.select

通常,在通道中启用传递确认。例如,使用 RabbitMQ Java 客户端库时,可以使用 Channel#basicAck 来设置一个简单的 basic.ack 肯定确认,如以下示例所示。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意

未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置预提取设置,限制使用者预提取的消息数量。

您可以配置consumer_timeout为检测消费者何时不确认交付。如果消费者未在超时值内发送确认,则频道将关闭,您将收到. PRECONDITION_FAILED 要诊断错误,UpdateConfigurationAPI请使用增加consumer_timeout值。

配置预提取

您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预取计数应用于消费者而不是渠道来实现 AMQP 0-9-1 提供的频道预取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下,RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。

在为您的 RabbitMQ 使用者设置预提取计数时,需要考虑各种因素。首先,考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中,因此,较高的预提取值可能会对使用者的性能产生负面影响,在某些情况下,可能会导致使用者同时崩溃。同样,RabbitMQ 代理本身会将其发送的所有消息缓存在内存中,直到收到使用者确认。如果没有为使用者配置自动确认,并且使用者需要相对较长的时间来处理消息,则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。

考虑到上述因素,我们建议始终设置预提取值,以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息,您可以使用一系列预提取计数来测试您的代理和使用者,以确定与使用者处理消息所需的时间相比,网络开销在哪个点上变得微不足道。

注意
  • 如果您的客户端应用程序已配置为自动确认将消息传递给使用者,则设置预提取值将不起作用。

  • 所有预提取消息都会从队列中删除。

以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 10 的预提取值。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意

在 RabbitMQ Java 客户端库中,global 标志的默认值设置为 false,所以上面的例子可以简单地写成 channel.basicQos(10)

配置 Celery

Python Celery 会发送许多不必要的消息,这些消息会使查找和处理有用的信息变得更加困难。为了降低噪音并使处理更容易,请输入以下命令:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

自动从网络故障中恢复

我们建议始终启用自动网络恢复,以防止在客户端连接到 RabbitMQ 节点失败的情况下出现严重停机。自版本 4.0.0 起,RabbitMQ Java 客户端库默认支持自动网络恢复。

如果在连接的输入/输出循环中引发未处理的异常、检测到套接字读取操作超时,或者如果服务器失去检测信号,则会触发自动连接恢复。

如果客户端和 RabbitMQ 节点之间的初始连接失败,将不会触发自动恢复。我们建议您编写应用程序代码,以便通过重试连接来解决初始连接失败的问题。以下示例演示了如何使用 RabbitMQ Java 客户端库来重试初始网络故障。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注意

如果应用程序使用 Connection.Close 方法关闭连接,则不会启用或触发自动网络恢复。

为您的 RabbitMQ 代理启用 Classic Queue v2

我们建议在代理引擎版本 3.10 和 3.11 上启用 Classic Queue v2 (CQv2),以提高性能,包括:

  • 减少内存使用量

  • 改善使用者传递

  • 提高工作负载的吞吐量,让使用者跟上生产者的步伐

默认情况下,3.12.13 及更高版本的所有亚马逊 MQ for RabbitMQ 队列都使用。CQv2要升级到最新版本的 Amazon MQ for RabbitMQ,请参阅。升级 Amazon MQ 代理引擎版本

从迁移CQv1到 CQv2

要使用CQv2,必须先启用classic_mirrored_queue_version功能标志。有关功能标志的更多信息,请参阅如何启用功能标记

要从迁移CQv1到CQv2,必须创建新的队列策略或编辑将queue-version策略密钥定义设置为的现有队列策略2。有关应用策略的更多信息,请参阅为亚马逊 MQ 应用 RabbitMQ 的政策。有关CQv2使用队列策略启用的更多信息,请参阅 RabbitM Q 文档中的经典队列

我们建议在开始迁移之前遵循其他最佳性能实践

如果您使用的是队列策略,则删除队列策略会将队列降级CQv2回到。CQv1我们不建议将CQv2队列降级CQv1为,因为 RabbitMQ 会转换队列的磁盘表示形式。对于较深的队列来说,这可能会占用大量内存,而且非常耗时。