本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将 Python Pika 与 Amazon MQ for RabbitMQ 结合使用
以下教程说明如何使用 TLS 设置 Python Pika
先决条件
要完成本教程中的步骤,您需要满足以下先决条件:
Amazon MQ for RabbitMQ 代理。有关更多信息,请参阅创建 Amazon MQ for RabbitMQ 代理。
安装适用于操作系统的 Python 3
。 -
使用 Python
pip
安装 Pika。要安装 Pika,请打开新的终端窗口并运行以下内容。 $
python3 -m pip install pika
权限
在本教程中,您需要至少一个 Amazon MQ for RabbitMQ 代理用户,该用户具有写入和读取虚拟主机的权限。下表描述了作为正则表达式 (regexp) 模式所需的最小权限。
标签 | 配置正则表达式 | 写入正则表达式 | 读取正则表达式 |
---|---|---|---|
none |
|
.* |
.* |
列出的用户权限仅向用户提供读写权限,而不授予在代理上执行管理操作的管理插件访问权限。您可以通过提供限制用户访问指定队列的 regexp 模式来进一步限制权限。例如,如果将读取 regexp 模式更改为 ^[hello world].*
,用户只拥有从以 hello world
开头的队列中读取的权限。
有关创建 RabbitMQ 用户和管理用户标记和权限的更多信息,请参阅 适用于 RabbitMQ 经纪商用户的亚马逊 MQ。
步骤一:创建基本的 Python Pika 客户端
请执行以下操作来创建 Python Pika 客户端基类,该基类定义构造函数,并在与 Amazon MQ for RabbitMQ 代理交互时提供 TLS 配置所需的 SSL 上下文。
-
打开新的终端窗口,为项目创建新目录,然后导航到该目录。
$
mkdir pika-tutorial
$
cd pika-tutorial
-
创建一个名为
basicClient.py
的文件,其中包含以下 Python 代码。import ssl import pika class BasicPikaClient: def __init__(self, rabbitmq_broker_id, rabbitmq_user, rabbitmq_password, region): # SSL Context for TLS configuration of Amazon MQ for RabbitMQ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ssl_context.set_ciphers('ECDHE+AESGCM:!ECDSA') url = f"amqps://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_broker_id}.mq.{region}.amazonaws.com:5671" parameters = pika.URLParameters(url) parameters.ssl_options = pika.SSLOptions(context=ssl_context) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()
现在,您可以为继承自 BasicPikaClient
的发布者和使用者定义其他类。
步骤二:创建发布者并发送消息
要创建声明队列的发布者并发送单条消息,请执行以下操作。
-
复制以下代码示例的内容,并将其作为
publisher.py
保存在本地上一个步骤创建的相同目录中。from basicClient import BasicPikaClient class BasicMessageSender(BasicPikaClient): def declare_queue(self, queue_name): print(f"Trying to declare queue({queue_name})...") self.channel.queue_declare(queue=queue_name) def send_message(self, exchange, routing_key, body): channel = self.connection.channel() channel.basic_publish(exchange=exchange, routing_key=routing_key, body=body) print(f"Sent message. Exchange: {exchange}, Routing Key: {routing_key}, Body: {body}") def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Initialize Basic Message Sender which creates a connection # and channel for sending messages. basic_message_sender = BasicMessageSender( "<broker-id>", "<username>", "<password>", "<region>" ) # Declare a queue basic_message_sender.declare_queue("hello world queue") # Send a message to the queue. basic_message_sender.send_message(exchange="", routing_key="hello world queue", body=b'Hello World!') # Close connections. basic_message_sender.close()
BasicMessageSender
类继承自BasicPikaClient
并实施了用于声明队列、向队列发送消息和关闭连接的其他方法。代码示例使用等于队列名称的路由密钥,将消息路由到默认交换器。 -
在
if __name__ == "__main__":
下,将传递给BasicMessageSender
构造函数语句的参数替换为以下信息。-
<broker-id>
– Amazon MQ 为代理生成的唯一 ID。您可以通过代理 ARN 解析 ID。例如,给定以下 ARNarn:aws:mq:us-east-2:123456789012:broker:MyBroker:b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
,代理 ID 将为b-1234a5b6-78cd-901e-2fgh-3i45j6k178l9
。 -
<username>
- 具有足够权限向代理写入消息的代理用户的用户名。 -
<password>
- 具有足够权限向代理写入消息的代理用户的密码。 -
<region>
- 您创建 Amazon MQ for RabbitMQ 代理的 Amazon 区域。例如,us-west-2
。
-
-
在创建
publisher.py
的同一目录中运行以下命令。$
python3 publisher.py
如果代码运行成功,您将在终端窗口中看到以下输出。
Trying to declare queue(hello world queue)... Sent message. Exchange: , Routing Key: hello world queue, Body: b'Hello World!'
步骤三:创建使用者并接收消息
要创建从队列接收单条消息的使用者,请执行以下操作。
-
复制以下代码示例的内容,并将其作为
consumer.py
保存在本地同一目录中。from basicClient import BasicPikaClient class BasicMessageReceiver(BasicPikaClient): def get_message(self, queue): method_frame, header_frame, body = self.channel.basic_get(queue) if method_frame: print(method_frame, header_frame, body) self.channel.basic_ack(method_frame.delivery_tag) return method_frame, header_frame, body else: print('No message returned') def close(self): self.channel.close() self.connection.close() if __name__ == "__main__": # Create Basic Message Receiver which creates a connection # and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. basic_message_receiver.get_message("hello world queue") # Close connections. basic_message_receiver.close()
与您在上一个步骤中创建的发布者类似,
BasicMessageReciever
继承自BasicPikaClient
并实施用于接收单条消息和关闭连接的其他方法。 -
在
if __name__ == "__main__":
语句下,将传递给BasicMessageReciever
构造函数的参数替换为您的信息。 -
在您的项目目录中运行以下命令。
$
python3 consumer.py
如果代码运行成功,您将在终端窗口中看到消息正文以及包括路由密钥的标头。
<Basic.GetOk(['delivery_tag=1', 'exchange=', 'message_count=0', 'redelivered=False', 'routing_key=hello world queue'])> <BasicProperties> b'Hello World!'
步骤四:(可选)设置事件循环并使用消息
要使用队列中的多条消息,请使用 Pika 的 basic_consume
-
在
consumer.py
中,将以下方法定义添加到BasicMessageReceiver
类。def consume_messages(self, queue): def callback(ch, method, properties, body): print(" [x] Received %r" % body) self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') self.channel.start_consuming()
-
在
consumer.py
中的if __name__ == "__main__":
下,调用您在上一个步骤中定义的consume_messages
方法。if __name__ == "__main__": # Create Basic Message Receiver which creates a connection and channel for consuming messages. basic_message_receiver = BasicMessageReceiver( "<broker-id>", "<username>", "<password>", "<region>" ) # Consume the message that was sent. # basic_message_receiver.get_message("hello world queue") # Consume multiple messages in an event loop. basic_message_receiver.consume_messages("hello world queue") # Close connections. basic_message_receiver.close()
-
再次运行
consumer.py
,如果运行成功,队列消息将显示在终端窗口中。[*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!' [x] Received b'Hello World!' ...
接下来做什么?
-
有关其他支持的 RabbitMQ 客户端库的更多信息,请参阅 RabbitMQ 网站上的 RabbitMQ 客户端文档
。