将 Python Pika 与 Amazon MQ for RabbitMQ 结合使用 - Amazon MQ
Amazon Web Services 文档中描述的 Amazon Web Services 服务或功能可能因区域而异。要查看适用于中国区域的差异,请参阅 中国的 Amazon Web Services 服务入门 (PDF)

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 Python Pika 与 Amazon MQ for RabbitMQ 结合使用

以下教程说明如何使用 TLS 设置 Python Pika,并将 TLS 配置为连接到 Amazon MQ for RabbitMQ 代理。Pika 是适用于 RabbitMQ 的 AMQP 0-9-1 协议的 Python 实现。本教程可指导您安装 Pika、声明队列、设置发布者以向代理的默认交换器发送消息,以及设置使用者以接收队列中的消息。

先决条件

要完成本教程中的步骤,您需要满足以下先决条件:

  • Amazon MQ for RabbitMQ 代理。有关更多信息,请参阅创建 Amazon MQ for RabbitMQ 代理

  • 安装适用于操作系统的 Python 3

  • 使用 Python pip 安装 Pika。要安装 Pika,请打开新的终端窗口并运行以下内容。

    $ python3 -m pip install pika

权限

在本教程中,您需要至少一个 Amazon MQ for RabbitMQ 代理用户,该用户具有写入和读取虚拟主机的权限。下表描述了作为正则表达式 (regexp) 模式所需的最小权限。

标签 配置正则表达式 写入正则表达式 读取正则表达式
none .* .*

列出的用户权限仅向用户提供读写权限,而不授予在代理上执行管理操作的管理插件访问权限。您可以通过提供限制用户访问指定队列的 regexp 模式来进一步限制权限。例如,如果将读取 regexp 模式更改为 ^[hello world].*,用户只拥有从以 hello world 开头的队列中读取的权限。

有关创建 RabbitMQ 用户和管理用户标记和权限的更多信息,请参阅 用户

步骤一:创建基本的 Python Pika 客户端

请执行以下操作来创建 Python Pika 客户端基类,该基类定义构造函数,并在与 Amazon MQ for RabbitMQ 代理交互时提供 TLS 配置所需的 SSL 上下文。

  1. 打开新的终端窗口,为项目创建新目录,然后导航到该目录。

    $ mkdir pika-tutorial $ cd pika-tutorial
  2. 创建一个名为 basicClient.py 的文件,其中包含以下 Python 代码。

    import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()

现在,您可以为继承自 BasicPikaClient 的发布者和使用者定义其他类。

步骤二:创建发布者并发送消息

要创建声明队列的发布者并发送单条消息,请执行以下操作。

  1. 复制以下代码示例的内容,并将其作为 publisher.py 保存在本地上一个步骤创建的相同目录中。

    from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()

    BasicMessageSender 类继承自 BasicPikaClient 并实施了用于声明队列、向队列发送消息和关闭连接的其他方法。代码示例使用等于队列名称的路由密钥,将消息路由到默认交换器。

  2. if __name__ == "__main__": 下,将传递给 BasicMessageSender 构造函数语句的参数替换为以下信息。

    • <broker-id> – Amazon MQ 为代理生成的唯一 ID。您可以通过代理 ARN 解析 ID。例如,给定以下 ARN arn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9,代理 ID 将为 b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9

    • <username> - 具有足够权限向代理写入消息的代理用户的用户名。

    • <password> - 具有足够权限向代理写入消息的代理用户的密码。

    • <region> - 您创建 Amazon MQ for RabbitMQ 代理的 Amazon 区域。例如,us-west-2

  3. 在创建 publisher.py 的同一目录中运行以下命令。

    $ python3 publisher.py

    如果代码运行成功,您将在终端窗口中看到以下输出。

    Trying to declare queue(hello world queue)...
    Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'

步骤三:创建使用者并接收消息

要创建从队列接收单条消息的使用者,请执行以下操作。

  1. 复制以下代码示例的内容,并将其作为 consumer.py 保存在本地同一目录中。

    from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()

    与您在上一个步骤中创建的发布者类似,BasicMessageReciever 继承自 BasicPikaClient 并实施用于接收单条消息和关闭连接的其他方法。

  2. if __name__ == "__main__": 语句下,将传递给 BasicMessageReciever 构造函数的参数替换为您的信息。

  3. 在您的项目目录中运行以下命令。

    $ python3 consumer.py

    如果代码运行成功,您将在终端窗口中看到消息正文以及包括路由密钥的标头。

    <Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'

步骤四:(可选)设置事件循环并使用消息

要使用队列中的多条消息,请使用 Pika 的 basic_consume 方法和回调函数,如下所示

  1. consumer.py 中,将以下方法定义添加到 BasicMessageReceiver 类。

    def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
  2. consumer.py 中的 if __name__ == "__main__": 下,调用您在上一个步骤中定义的 consume_messages 方法。

    if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
  3. 再次运行 consumer.py,如果运行成功,队列消息将显示在终端窗口中。

    [*] Waiting for messages. To exit press CTRL+C
    [x] Received b'Hello World!'
    [x] Received b'Hello World!'
    ...

接下来做什么?

  • 有关其他支持的 RabbitMQ 客户端库的更多信息,请参阅 RabbitMQ 网站上的 RabbitMQ 客户端文档