Amazon MQ for ActiveMQ 最佳实践 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon MQ for ActiveMQ 最佳实践

以此作为参考快速找到在 Amazon MQ 上使用 ActiveMQ 代理时最大程度提高性能和降低吞吐量成本的建议。

永远不要修改或删除 Amazon MQ 弹性网络接口

当您首次创建 Amazon MQ 代理时,Amazon MQ 会在您的账户下的虚拟私有云 VPC () 中配置一个弹性网络接口,因此需要一些权限。EC2该网络接口允许您的客户端(创建者或使用者)与 Amazon MQ 代理通信。尽管网络接口属于您的账户,但仍被视为在 Amazon MQ 的服务范围内。VPC

警告

您不得修改或删除此网络接口。修改或删除网络接口可能会导致您VPC和您的经纪商之间永久断开连接。

Diagram showing Client, Elastic Network Interface, and Amazon MQ Broker within a Customer VPC and service scope.

始终使用连接池

在使用单个创建者和单个使用者的方案(例如 入门:创建并连接 ActiveMQ 代理 教程),您可以将单个 ActiveMQConnectionFactory 类用于每个创建者和使用者。例如:

// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();

但是,在具有多个创建者和使用者的更真实的方案中,为多个创建者创建大量连接可能成本高昂,并且效率低下。在这些方案中,您应使用 PooledConnectionFactory 类将多个创建者请求分组。例如:

注意

消息使用者绝不 应使用 PooledConnectionFactory 类。

// Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the sign-in credentials. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start();

始终使用故障转移传输连接到多个代理终端节点

如果应用程序需要连接到多个代理终端节点,例如,当您使用主/备用部署模式或者从本地消息代理迁移到 Amazon MQ 时,使用故障转移传输以允许您的使用者随机连接到一个。例如:

failover:(ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617,ssl://b-9876l5k4-32ji-109h-8gfe-7d65c4b132a1-2.mq.us-east-2.amazonaws.com:61617)?randomize=true

避免使用消息选择器

可以使用JMS选择器将过滤器附加到主题订阅(根据消费者的内容将消息路由给消费者)。但是,使用JMS选择器会填满 Amazon MQ 经纪商的筛选缓冲区,使其无法筛选消息。

一般来说,应避免让使用者路由消息,这样做的原因是,为了实现使用者和创建者的最佳解耦,使用者和创建者均应是短暂存在的。

首选虚拟目标而非持久订阅

持久订阅可帮助确保使用者收到发布到主题的所有消息,例如,在恢复丢失的连接后。但是,使用持久订阅还阻止竞争性使用者使用并可能具有大规模性能问题。考虑改用虚拟目标

如果使用 Amazon VPC 对等互连,请避开IPs位于范围内的CIDR客户端 10.0.0.0/16

如果您要在本地基础设施和 Ama VPC zon MQ 代理之间设置 Amazon 对等互连,则不得在范围内配置客户端连接IPs。CIDR 10.0.0.0/16

对具有慢速使用者的队列禁用并发存储和分派

默认情况下,Amazon MQ 针对具有快速使用者的队列进行优化:

  • 当使用者能够跟上创建器生成的消息速率时,将其视为快速

  • 如果队列造成了未确认消息积压,并可能导致创建器吞吐量下降,则将使用者视为慢速

要指示 Amazon MQ 针对具有慢速使用者的队列进行优化,请将 concurrentStoreAndDispatchQueues 属性设置为 false。有关示例配置,请参阅 concurrentStoreAndDispatchQueues

选择正确的代理实例类型以实现最佳吞吐量

代理实例类型的消息吞吐量取决于应用程序的使用案例和以下因素:

  • 持久模式下 ActiveMQ 的使用

  • 消息大小

  • 创建器和使用者的数量

  • 目标的数量

了解消息大小、延迟和吞吐量之间的关系

根据您的使用案例,较大的代理实例类型不一定能提高系统吞吐量。当 ActiveMQ 将消息写入持久存储中,消息的大小决定了系统的限制因素:

  • 如果您的消息大小不到 100 KB,则持久性存储延迟是限制因素。

  • 如果您的消息大小超过 100 KB,则持久性存储吞吐量是限制因素。

当您在持久模式下使用 ActiveMQ 时,通常会在使用者较少或使用者较慢的情况下发生写入存储。在非持久模式下,如果代理实例的堆内存已满,则也会在使用者较慢的情况下发生写入存储。

要为您的应用程序确定最佳代理实例类型,我们建议您测试不同的代理实例类型。有关更多信息,请参阅Broker instance types使用基准测试衡量 Amazon MQ 的吞吐量。JMS

较大代理实例类型的使用案例

当较大代理实例类型提高吞吐量时,存在以下三个常见使用案例:

  • 非持久模式 – 当您的应用程序在代理实例故障转移(例如,播报体育赛事比分时)期间对消息丢失不太敏感时,您通常可以使用 ActiveMQ 的非持久模式。在此模式下,仅在代理实例的堆内存已满时,ActiveMQ 才会将消息写入持久性存储中。使用非持久模式的系统可以受益于更大的代理实例类型上可用的更高内存量CPU、更快、更快的网络。

  • 快速使用者 – 当存在活动使用者且 concurrentStoreAndDispatchQueues 标志启用时,ActiveMQ 允许消息直接从创建器传递到使用者,而无需将消息发送到存储(即使在持久模式下也是如此)。如果您的应用程序可以快速使用消息(或者您可以将使用者设计为这么做),则应用程序能从较大代理实例类型中受益。要让您的应用程序更快地使用消息,请为应用程序实例添加使用者线程,或者纵向或横向扩展应用程序实例。

  • 批处理事务 – 当您使用持久模式且在每个事务中发送多条消息时,您可以使用较大代理实例类型来实现总体更高消息吞吐量。有关更多信息,请参阅 ActiveMQ 文档中的我是否应使用事务?

选择正确的代理存储类型以实现最佳吞吐量

要利用高耐久性和跨多个可用区复制的优势,请使用 Amazon EFS。要充分利用低延迟和高吞吐量的优势,请使用 Amazon EBS。有关更多信息,请参阅 Storage

正确配置您的代理网络

当您创建代理网络时,请为您的应用程序正确配置它:

  • 启用持续模式 – 因为(相对于其对等项)每个代理实例充当创建者或使用者,所以代理网络不提供消息的分布式复制。第一个充当使用者的代理接收消息并将其保留到存储中。此代理向创建者发送确认,并将消息转发给下一个代理。当第二个代理确认消息的持久性后,第一个代理将删除该消息。

    如果禁用持久模式,则第一个代理会在不将消息保留到存储的情况下确认创建者。有关更多信息,请参阅 Apache ActiveMQ 文档中的复制消息存储持久和非持久交付有什么区别?

  • 请勿对代理实例禁用建议消息 – 有关更多信息,请参阅 Apache ActiveMQ 文档中的建议消息

  • 请勿使用多播代理发现 – Amazon MQ 不支持使用多播的代理发现。有关更多信息,请参阅 Apache ActiveMQ 文档中的发现、多播和 zeroconf 的区别是什么?

通过恢复已准备 XA 事务避免缓慢重

ActiveMQ 支持分布式 (XA) 事务。了解 ActiveMQ 如何处理 XA 事务有助于避免 Amazon MQ 中代理重启和故障转移的缓慢恢复时间

每次重启时都会重放未解析的已准备 XA 事务。如果这些问题仍未被解析,其数量将随着时间的推移而增长,从而显著增加启动代理所需的时间。这会影响重启和故障转移时间。您必须使用 commit()rollback() 解析这些事务,以便性能不会随着时间的推移而降低。

要监控未处理的准备的 XA 交易,您可以使用 Amazon CloudWatch Logs 中的JournalFilesForFastRecovery指标。如果该数字不断增加,或者始终高于 1,则应使用类似于以下示例的代码恢复未解析的事务。有关更多信息,请参阅 Quotas in Amazon MQ

以下示例代码遍历已准备 XA 事务,并使用 rollback() 关闭它们。

import org.apache.activemq.ActiveMQXAConnectionFactory; import javax.jms.XAConnection; import javax.jms.XASession; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; public class RecoverXaTransactions { private static final ActiveMQXAConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY; final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";; static { final String activeMqUsername = "MyUsername123"; final String activeMqPassword = "MyPassword456"; ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQXAConnectionFactory(activeMqUsername, activeMqPassword, WIRE_LEVEL_ENDPOINT); ACTIVE_MQ_CONNECTION_FACTORY.setUserName(activeMqUsername); ACTIVE_MQ_CONNECTION_FACTORY.setPassword(activeMqPassword); } public static void main(String[] args) { try { final XAConnection connection = ACTIVE_MQ_CONNECTION_FACTORY.createXAConnection(); XASession xaSession = connection.createXASession(); XAResource xaRes = xaSession.getXAResource(); for (Xid id : xaRes.recover(XAResource.TMENDRSCAN)) { xaRes.rollback(id); } connection.close(); } catch (Exception e) { } } }

在实际场景中,您可以针对 XA 事务管理器检查已准备 XA 事务。然后,您可以使用 rollback()commit() 来决定是否处理每个已准备事务。