Amazon MQ for RabbitMQ 最佳实践 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

Amazon MQ for RabbitMQ 最佳实践

以此作为参考快速找到在 Amazon MQ 上使用 RabbitMQ 代理最大程度提高性能和降低吞吐量成本的建议。

启用延迟队列

如果您使用的是处理大量消息的长队列,启用延迟队列可以提高您的代理的整体性能。

RabbitMQ 的默认行为是在内存中缓存消息,并且仅在代理需要更多可用内存时将其移动到磁盘。将消息从内存移动到磁盘的过程可能需要一些时间,并停止队列处理消息。启用延迟队列可能会对加快将消息移动到磁盘的过程产生重大影响,因为延迟队列会尽快将消息存储到磁盘,从而减少内存中缓存的消息。

您可以通过在声明时设置 queue.declare 参数或通过 RabbitMQ 管理控制台配置策略来启用延迟队列。以下示例演示了如何使用 RabbitMQ Java 客户端库声明延迟队列。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
注意

启用延迟队列会增加磁盘输入/输出操作。

使用持久和持续队列

持久消息有助于防止在代理崩溃或重启的情况下丢失数据。持久消息一到达就会立即写入磁盘。但是,与延迟队列不同的是,持久消息同时在内存和磁盘中缓存,除非代理需要更多内存。在需要更多内存的情况下,通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息,通常称为持久性层

要启用消息持久性,可以将队列声明为 durable 并将消息传递模式设置为 persistent。以下示例演示了如何使用 RabbitMQ Java 客户端库声明持续队列。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

将队列配置为持续队列后,您可以通过将 MessageProperties 设置为 PERSISTENT_TEXT_PLAIN 来将持久消息发送到您的队列,如以下示例所示。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

保持队列简短

在集群部署中,包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时,重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启,过度利用的代理可能会在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store(Amazon EBS)卷加载到内存中,以进行批处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较 CloudWatch 中的 RabbitMemUsedRabbitMqMemLimit 代理节点指标来确认内存使用情况。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。

如果集群部署暂停队列同步,我们建议使用或删除消息,以减少队列中的消息数量。一旦队列深度减少且队列同步完成,代理状态将更改为 RUNNING。要解决暂停的队列同步,您还可以应用策略来减少队列同步批处理大小

警告

不要重启资源占用率高的代理。

如果在队列同步暂停时重启代理,则代理将重启同步过程,这会在消息从存储传输到节点内存时进一步削弱代理资源,并导致代理在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

配置确认

当客户端应用程序向代理发回消息传递和使用确认时,它被称为使用者确认。同样,向发布者发送确认的过程称为发布者确认。在使用 RabbitMQ 代理时,确认对于确保数据安全至关重要。

使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时,可以通过配置 basic.consume 或使用 basic.code 方法获取消息来启用确认。

通常,在通道中启用传递确认。例如,使用 RabbitMQ Java 客户端库时,可以使用 Channel#basicAck 来设置一个简单的 basic.ack 肯定确认,如以下示例所示。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意

未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置预提取设置,限制使用者预提取的消息数量。

配置预提取

您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预提取计数应用于使用者而不是通道,实现 AMQP 0-9-1 提供的通道预提取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下,RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。

在为您的 RabbitMQ 使用者设置预提取计数时,需要考虑各种因素。首先,考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中,因此,较高的预提取值可能会对使用者的性能产生负面影响,在某些情况下,可能会导致使用者同时崩溃。同样,RabbitMQ 代理本身会将其发送的所有消息缓存在内存中,直到收到使用者确认。如果没有为使用者配置自动确认,并且使用者需要相对较长的时间来处理消息,则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。

考虑到上述因素,我们建议始终设置预提取值,以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息,您可以使用一系列预提取计数来测试您的代理和使用者,以确定与使用者处理消息所需的时间相比,网络开销在哪个点上变得微不足道。

注意
  • 如果您的客户端应用程序已配置为自动确认将消息传递给使用者,则设置预提取值将不起作用。

  • 所有预提取消息都会从队列中删除。

以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 10 的预提取值。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意

在 RabbitMQ Java 客户端库中,global 标志的默认值设置为 false,所以上面的例子可以简单地写成 channel.basicQos(10)

配置 Celery

Python Celery 会发送许多不必要的消息,这些消息会使查找和处理有用的信息变得更加困难。为了降低噪音并使处理更容易,请输入以下命令:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

自动从网络故障中恢复

我们建议始终启用自动网络恢复,以防止在客户端连接到 RabbitMQ 节点失败的情况下出现严重停机。自版本 4.0.0 起,RabbitMQ Java 客户端库默认支持自动网络恢复。

如果在连接的输入/输出循环中引发未处理的异常、检测到套接字读取操作超时,或者如果服务器失去检测信号,则会触发自动连接恢复。

如果客户端和 RabbitMQ 节点之间的初始连接失败,将不会触发自动恢复。我们建议您编写应用程序代码,以便通过重试连接来解决初始连接失败的问题。以下示例演示了如何使用 RabbitMQ Java 客户端库来重试初始网络故障。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注意

如果应用程序使用 Connection.Close 方法关闭连接,则不会启用或触发自动网络恢复。