创建并连接到 ActiveMQ 代理 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅中国的 Amazon Web Services 服务入门

创建并连接到 ActiveMQ 代理

代理 是运行在 Amazon MQ 上的消息代理环境。它是 Amazon MQ 的基本构建块。代理实例m5t3)和大小largemicro)的综合描述是一个代理实例类型(例如 mq.m5.large)。有关更多信息,请参阅代理

步骤 1:创建 ActiveMQ 代理

第一个也是最常见的 Amazon MQ 任务是创建代理。以下示例演示如何使用Amazon Web Services Management Console创建基本代理。

  1. 登录 Amazon MQ 控制台

  2. Select broker engine (选择代理引擎) 页面上,选择 Apache ActiveMQ

  3. Select deployment and storage (选择部署和存储) 页面的 Deployment mode and storage type (部署模式和存储类型) 部分,执行以下操作:

    1. 选择 Deployment mode (部署模式)(例如 Active/standby broker (主动/备用代理))。有关更多信息,请参阅Broker Architecture

      • 单实例代理由一个可用区中的一个代理组成。代理与您的应用程序以及 Amazon EBS 或 Amazon EFS 存储卷进行通信。有关更多信息,请参阅Amazon MQ 单实例代理

      • 高可用性的主动/备用代理由两个不同可用区中的两个代理组成,以冗余对配置。这些代理与您的应用程序以及 Amazon EFS 进行同步通信。有关更多信息,请参阅用于实现高可用性的 Amazon MQ 主动/备用代理

      • 有关代理网络示例蓝图的更多信息,请参阅示例蓝图

    2. 选择 Storage type (存储类型)(例如 EBS)。有关更多信息,请参阅Storage

      注意

      Amazon EBS 在单个可用区内复制数据,但不支持 ActiveMQ 主动/备用部署模式。

    3. 选择 Next (下一步)

  4. Configure settings (配置设置) 页面的 Details (详细信息) 部分,执行以下操作:

    1. 输入 Broker name (代理名称)

      重要

      请勿在代理名称中添加个人身份信息(PII)或其他机密或敏感信息。其他Amazon服务(包括 CloudWatch Logs)可以访问代理名称。代理名称不适合用于私有或敏感数据。

    2. 选择 Broker instance type (代理实例类型)(例如 mq.m5.large)。有关更多信息,请参阅Broker instance types

  5. ActiveMQ Web Console access (ActiveMQ Web 控制台访问) 部分,提供 Username (用户名)Password (密码)。以下限制适用于代理用户名和密码:

    • 用户名只能包含字母数字字符、短划线、句点、下划线和波浪线(- . _ ~)。

    • 密码必须至少为 12 个字符,包含至少 4 个唯一字符,并且不得包含逗号、冒号或等号(,:=)。

    重要

    请勿在代理用户名中添加个人身份信息(PII)或其他机密或敏感信息。其他Amazon服务(包括 CloudWatch Logs)可以访问代理用户名。代理用户名不适合用于私有或敏感数据。

  6. 选择 Deploy(部署)。

    当 Amazon MQ 创建您的代理时,会显示 Creation in progress (正在创建) 状态。

    创建代理大约需要 15 分钟。

    成功创建您的代理后,Amazon MQ 会显示 Running (正在运行) 状态。

  7. 选择 MyBroker

    MyBroker 页面的 Connect(连接)部分,请注意代理的 ActiveMQ web console(ActiveMQ Web 控制台)URL,例如:

    https://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:8162

    另外,请记下您代理的线级协议终端节点。以下是 OpenWire 终端节点的示例:

    ssl://b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9-1.mq.us-east-2.amazonaws.com:61617

步骤 2:将 Java 应用程序连接到您的代理

创建 Amazon MQ ActiveMQ 代理后,您可以将应用程序连接到该代理。以下示例演示如何使用 Java Message Service(JMS)创建代理连接、创建队列以及发送消息。有关完整的可用 Java 示例,请参阅Working Java Example

您可以使用各种 ActiveMQ 客户端连接到 ActiveMQ 代理。我们建议使用 ActiveMQ 客户端

先决条件

启用 VPC 属性

要确保您的代理可以在您的 VPC 中访问,您必须启用 enableDnsHostnamesenableDnsSupport VPC 属性。有关更多信息,请参阅《Amazon VPC 用户指南》中的 VPC 中的 DNS Support

启用入站连接

  1. 登录 Amazon MQ 控制台

  2. 从代理列表中选择您的代理的名称(例如 MyBroker)。

  3. MyBroker 页面的 Connections (连接) 部分,记下代理 Web 控制台 URL 和线级协议的地址和端口。

  4. Details (详细信息) 部分的 Security and network (安全与网络) 下,选择您的安全组名称或

    此时将显示 EC2 Dashboard 的 Security Groups (安全组) 页面。

  5. 从安全组列表中,选择您的安全组。

  6. 在页面底部,选择 Inbound (入站),然后选择 Edit (编辑)

  7. Edit inbound rules (编辑入站规则) 对话框中,为希望公开访问的每个 URL 或终端节点添加规则(以下示例显示如何为代理 Web 控制台执行此操作。

    1. 选择 Add Rule(添加规则)。

    2. 对于 Type (类型),选择 Custom TCP (自定义 TCP)

    3. 对于 Port Range (端口范围),键入 Web 控制台端口(8162)。

    4. 对于 Source (源),选择 Custom (自定义),然后键入您希望能够访问 Web 控制台的系统的 IP 地址(例如 192.0.2.1)。

    5. 选择保存

      您的代理现在可以接受入站连接。

添加 Java 依赖项

activemq-client.jaractivemq-pool.jar 程序包添加到 Java 类路径中。以下示例说明了 Maven 项目的 pom.xml 文件中的这些依赖关系。

<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.8</version> </dependency> </dependencies>

有关 activemq-client.jar 的更多信息,请参阅 Apache ActiveMQ 文档中的初始配置

重要

在以下示例代码中,生产者和使用者在单个线程中运行。对于生产系统(或测试代理实例故障转移),请确保您的创建者和使用者在单独的主机或线程上运行。

创建消息创建者并发送消息

  1. 使用代理的终端节点为消息创建者创建 JMS 池连接工厂,然后针对该工厂调用 createConnection 方法。

    注意

    对于主动/备用代理,Amazon MQ 提供两个 ActiveMQ Web 控制台 URL,但每次只有一个 URL 处于活动状态。同样,Amazon MQ 为每个线级协议提供两个终端节点,但每次每对中只有一个终端节点处于活动状态。-1-2 后缀表示冗余对。有关更多信息,请参阅 Broker Architecture

    对于线级协议终端节点,您可以允许应用程序使用故障转移传输连接到任一终端节点。

    // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the username and password. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Create a pooled connection factory. final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(); pooledConnectionFactory.setConnectionFactory(connectionFactory); pooledConnectionFactory.setMaxConnections(10); // Establish a connection for the producer. final Connection producerConnection = pooledConnectionFactory.createConnection(); producerConnection.start();
    注意

    消息创建者应始终使用 PooledConnectionFactory 类。有关更多信息,请参阅始终使用连接池

  2. 创建一个会话,一个名为 MyQueue 的队列和消息创建者。

    // Create a session. final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination producerDestination = producerSession.createQueue("MyQueue"); // Create a producer from the session to the queue. final MessageProducer producer = producerSession.createProducer(producerDestination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  3. 创建消息字符串 "Hello from Amazon MQ!",然后发送消息。

    // Create a message. final String text = "Hello from Amazon MQ!"; TextMessage producerMessage = producerSession.createTextMessage(text); // Send the message. producer.send(producerMessage); System.out.println("Message sent.");
  4. 清理创建者。

    producer.close(); producerSession.close(); producerConnection.close();

创建消息使用者并接收消息

  1. 使用代理的终端节点为消息创建者创建 JMS 连接工厂,然后针对该工厂调用 createConnection 方法。

    // Create a connection factory. final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(wireLevelEndpoint); // Pass the username and password. connectionFactory.setUserName(activeMqUsername); connectionFactory.setPassword(activeMqPassword); // Establish a connection for the consumer. final Connection consumerConnection = connectionFactory.createConnection(); consumerConnection.start();
    注意

    消息使用者绝不 应使用 PooledConnectionFactory 类。有关更多信息,请参阅始终使用连接池

  2. 创建一个会话,一个名为 MyQueue 的队列和消息使用者。

    // Create a session. final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue named "MyQueue". final Destination consumerDestination = consumerSession.createQueue("MyQueue"); // Create a message consumer from the session to the queue. final MessageConsumer consumer = consumerSession.createConsumer(consumerDestination);
  3. 开始等待消息,并在消息到达时收到消息。

    // Begin to wait for messages. final Message consumerMessage = consumer.receive(1000); // Receive the message when it arrives. final TextMessage consumerTextMessage = (TextMessage) consumerMessage; System.out.println("Message received: " + consumerTextMessage.getText());
    注意

    与Amazon消息收发服务(例如 Amazon SQS)不同的是,使用者会不断连接到代理。

  4. 关闭使用者、会话和连接。

    consumer.close(); consumerSession.close(); consumerConnection.close();

步骤 3:(可选)连接到 Amazon Lambda 函数

Amazon Lambda 可以连接并使用来自您的 Amazon MQ 代理的消息。当您将代理连接到 Lambda 时,可以创建事件源映射,从队列中读取消息并同步调用函数。您创建的事件源映射分批从您的代理中读取消息,并以 JSON 对象的形式将它们转换为 Lambda 负载。

将您的代理连接到 Lambda 函数

  1. 将以下 IAM 角色权限添加到 Lambda 函数执行角色

    注意

    如果没有必要的 IAM 权限,您的函数将无法从 Amazon MQ 资源中成功读取记录。

  2. (可选)如果您创建了一个没有公开可访问性的代理,则必须执行下面其中一项操作以允许 Lambda 连接到您的代理:

    • 为每个公有子网配置一个 NAT 网关。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的VPC 连接函数的互联网和服务访问

    • 使用 VPC 终端节点在您的 Amazon Virtual Private Cloud(Amazon VPC)和 Lambda 之间创建连接。您的 Amazon VPC 还必须连接到 Amazon Security Token Service (Amazon STS) 和 Secrets Manager 端点。有关更多信息,请参阅《Amazon Lambda 开发人员指南》中的为 Lambda 配置接口 VPC 终端节点

  3. 使用Amazon Web Services Management Console为 Lambda 函数配置代理作为事件源。您也可以使用 create-event-source-mapping Amazon Command Line Interface 命令。

  4. 为 Lambda 函数编写一些代码来处理从您的代理使用的消息。事件源映射检索的 Lambda 负载取决于代理的引擎类型。以下是适用于 Amazon MQ for ActiveMQ 队列的 Lambda 负载示例。

    注意

    在该示例中,testQueue 是队列的名称。

    { "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": { [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "data": "QUJDOkFBQUE=", "connectionId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType":"jms/bytes-message", "data": "3DTOOW7crj51prgVLQaGQ82S48k=", "connectionId": "myJMSCoID1", "persistent": false, "destination": { "physicalname": "testQueue" }, "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ] } }

有关将 Amazon MQ 连接到 Lambda、Lambda 为 Amazon MQ 事件源提供支持的选项和事件源映射错误的更多信息,请参阅《Amazon Lambda 开发人员指南》中的将 Lambda 与 Amazon MQ 结合使用

步骤 4:删除代理

如果您不使用 Amazon MQ 代理(并且估计未来近期也不会使用代理),最佳实践是将其从 Amazon MQ 中删除以减少Amazon成本。

以下示例演示如何使用Amazon Web Services Management Console删除代理。

  1. 登录 Amazon MQ 控制台

  2. 从代理列表中,选择您的代理(例如 MyBroker),然后选择 Delete (删除)

  3. Delete MyBroker? (是否删除 MyBroker?) 对话框中,键入 delete,然后选择 Delete (删除)

    删除代理大约需要 5 分钟。

后续步骤

现在,您已经创建了一个代理,将一个应用程序连接到了该代理,并发送和接收了一条消息,您可以希望尝试以下操作:

您也可以开始深入了解 Amazon MQ 的最佳实践Amazon MQ REST API,然后计划迁移到 Amazon MQ