Amazon MQ for RabbitMQ 最佳实践 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

Amazon MQ for RabbitMQ 最佳实践

以此作为参考快速找到在 Amazon MQ 上使用 RabbitMQ 代理最大程度提高性能和降低吞吐量成本的建议。

启用延迟队列

如果您使用的是处理大量消息的长队列,启用延迟队列可以提高您的代理的整体性能。

RabbitMQ 的默认行为是在内存中缓存消息,并且仅在代理需要更多可用内存时将其移动到磁盘。将消息从内存移动到磁盘的过程可能需要一些时间,并停止队列处理消息。启用延迟队列可能会对加快将消息移动到磁盘的过程产生重大影响,因为延迟队列会尽快将消息存储到磁盘,从而减少内存中缓存的消息。

您可以通过在声明时设置 queue.declare 参数或通过 RabbitMQ 管理控制台配置策略来启用延迟队列。以下示例演示了如何使用 RabbitMQ Java 客户端库声明延迟队列。

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
注意

启用延迟队列会增加磁盘输入/输出操作。

使用持久和持续队列

持久消息有助于防止在代理崩溃或重启的情况下丢失数据。持久消息一到达就会立即写入磁盘。但是,与延迟队列不同的是,持久消息同时在内存和磁盘中缓存,除非代理需要更多内存。在需要更多内存的情况下,通过管理将消息存储到磁盘的 RabbitMQ 代理机制从内存中删除消息,通常称为持久性层

要启用消息持久性,可以将队列声明为 durable 并将消息传递模式设置为 persistent。以下示例演示了如何使用 RabbitMQ Java 客户端库声明持续队列。

boolean durable = true; channel.queueDeclare("my_queue", durable, false, false, null);

将队列配置为持续队列后,您可以通过将 MessageProperties 设置为 PERSISTENT_TEXT_PLAIN 来将持久消息发送到您的队列,如以下示例所示。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "my_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

保持队列简短

在集群部署中,包含大量消息的队列可能会导致资源过度利用。当代理被过度利用时,重启 Amazon MQ for RabbitMQ 代理可能会导致性能进一步降低。如果重启,过度利用的代理可能会在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

维护时段,Amazon MQ 每次执行一个节点的所有维护工作,以确保代理保持正常运行。因此,在每个节点恢复正常运行时,队列可能需要同步。在同步过程中,需要复制到镜像的消息将从相应的 Amazon Elastic Block Store(Amazon EBS)卷加载到内存中,以进行批处理。批处理消息可以让队列更快地同步。

如果队列保持简短且消息较少,则队列会按预期成功同步并恢复正常运行。但是,如果批处理中的数据量接近节点的内存限制,节点会引发高内存警报,暂停队列同步。您可以通过比较 CloudWatch 中的 RabbitMemUsedRabbitMqMemLimit 代理节点指标来确认内存使用情况。在消耗或删除消息或批处理中的消息数量减少之前,同步无法完成。

如果集群部署暂停队列同步,我们建议使用或删除消息,以减少队列中的消息数量。一旦队列深度减少且队列同步完成,代理状态将更改为 RUNNING。要解决暂停的队列同步,您还可以应用策略来减少队列同步批处理大小

警告

不要重启资源占用率高的代理。

如果在队列同步暂停时重启代理,则代理将重启同步过程,这会在消息从存储传输到节点内存时进一步削弱代理资源,并导致代理在 REBOOT_IN_PROGRESS 状态下变得反应迟钝。

配置确认

当客户端应用程序向代理发回消息传递和使用确认时,它被称为使用者确认。同样,向发布者发送确认的过程称为发布者确认。在使用 RabbitMQ 代理时,确认对于确保数据安全至关重要。

使用者传递确认通常在客户端应用程序上配置。使用 AMQP 0-9-1 时,可以通过配置 basic.consume 或使用 basic.code 方法获取消息来启用确认。

通常,在通道中启用传递确认。例如,使用 RabbitMQ Java 客户端库时,可以使用 Channel#basicAck 来设置一个简单的 basic.ack 肯定确认,如以下示例所示。

// this example assumes an existing channel instance boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // positively acknowledge a single delivery, the message will // be discarded channel.basicAck(deliveryTag, false); } });
注意

未确认的消息必须在内存中缓存。您可以通过为客户端应用程序配置预提取设置,限制使用者预提取的消息数量。

配置预提取

您可以使用 RabbitMQ 预提取值来优化使用者使用消息的方式。RabbitMQ 通过将预提取计数应用于使用者而不是通道,实现 AMQP 0-9-1 提供的通道预提取机制。预提取值用于指定在任何给定时间向使用者发送的消息数量。默认情况下,RabbitMQ 会为客户端应用程序设置无限制的缓冲区大小。

在为您的 RabbitMQ 使用者设置预提取计数时,需要考虑各种因素。首先,考虑使用者的环境和配置。由于使用者需要在处理消息时将所有消息保存在内存中,因此,较高的预提取值可能会对使用者的性能产生负面影响,在某些情况下,可能会导致使用者同时崩溃。同样,RabbitMQ 代理本身会将其发送的所有消息缓存在内存中,直到收到使用者确认。如果没有为使用者配置自动确认,并且使用者需要相对较长的时间来处理消息,则较高的预提取值可能会导致 RabbitMQ 服务器内存不足。

考虑到上述因素,我们建议始终设置预提取值,以防止由于大量未处理或未确认的消息而导致 RabbitMQ 代理或其使用者出现内存不足的情况。如果您需要优化代理来处理大量消息,您可以使用一系列预提取计数来测试您的代理和使用者,以确定与使用者处理消息所需的时间相比,网络开销在哪个点上变得微不足道。

注意
  • 如果您的客户端应用程序已配置为自动确认将消息传递给使用者,则设置预提取值将不起作用。

  • 所有预提取消息都会从队列中删除。

以下示例演示了如何使用 RabbitMQ Java 客户端库为单一使用者设置 10 的预提取值。

ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(10, false); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("my_queue", false, consumer);
注意

在 RabbitMQ Java 客户端库中,global 标志的默认值设置为 false,所以上面的例子可以简单地写成 channel.basicQos(10)

配置 Celery

Python Celery 会发送许多不必要的消息,这些消息会使查找和处理有用的信息变得更加困难。为了降低噪音并使处理更容易,请输入以下命令:

celery -A app_name worker --without-heartbeat --without-gossip --without-mingle

自动从网络故障中恢复

我们建议始终启用自动网络恢复,以防止在客户端连接到 RabbitMQ 节点失败的情况下出现严重停机。自版本 4.0.0 起,RabbitMQ Java 客户端库默认支持自动网络恢复。

如果在连接的输入/输出循环中引发未处理的异常、检测到套接字读取操作超时,或者如果服务器失去检测信号,则会触发自动连接恢复。

如果客户端和 RabbitMQ 节点之间的初始连接失败,将不会触发自动恢复。我们建议您编写应用程序代码,以便通过重试连接来解决初始连接失败的问题。以下示例演示了如何使用 RabbitMQ Java 客户端库来重试初始网络故障。

ConnectionFactory factory = new ConnectionFactory(); // enable automatic recovery if using RabbitMQ Java client library prior to version 4.0.0. factory.setAutomaticRecoveryEnabled(true); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
注意

如果应用程序使用 Connection.Close 方法关闭连接,则不会启用或触发自动网络恢复。