当前位置: 首页 > news >正文

【RabbitMQ】多系统下的安装配置与编码使用(python)

文章目录

  • 引言
    • 核心概念
    • RabbitMQ 的常见应用场景
  • 1. 安装 RabbitMQ
    • 在本地安装 RabbitMQ
      • Linux (Ubuntu)
      • MacOS
      • Windows
        • 配置环境变量
  • 2. Python 代码示例
    • 安装 Python 客户端库
    • 示例 1:基本队列通信
      • 生产者(producer.py)
      • 消费者(consumer.py)
      • 运行测试
    • 示例 2:使用交换机(Topic 模式)
      • 生产者(topic_producer.py)
      • 消费者(topic_consumer.py)
      • 运行测试
    • 示例 3:消息持久化与手动确认
      • 生产者(持久化消息)
      • 消费者(手动 ACK)
    • 代码结合:
    • 3. 关键注意事项

引言

RabbitMQ 是一个开源的 消息代理(Message Broker) 软件,实现了 高级消息队列协议(AMQP),用于在分布式系统中存储、转发消息,支持多种消息传递模式。它通过解耦生产者和消费者、异步处理、流量削峰等机制,提升系统的可扩展性、可靠性和灵活性。

本文主要介绍在Linux/MacOS/Windows下的Rabbit的安装,以及以python为示例的具体代码编写。

核心概念

  1. Producer(生产者):发送消息的程序。
  2. Consumer(消费者):接收消息的程序。
  3. Queue(队列):存储消息的缓冲区,消息会一直存在队列中直到被消费。
  4. Exchange(交换机):接收生产者发送的消息,并根据规则(路由键、绑定等)将消息路由到队列。
  5. Binding(绑定):定义 Exchange 和 Queue 之间的关系。
  6. Message(消息):传递的数据,包含有效负载(payload)和元数据(如路由键)。

RabbitMQ 的常见应用场景

  1. 异步处理

    • 示例:用户注册后发送邮件/短信通知。
    • 优势:主流程快速响应,耗时操作异步交给消费者处理。
  2. 应用解耦

    • 示例:订单系统和库存系统通过消息队列通信,避免直接接口调用。
    • 优势:某一系统宕机不影响其他系统,消息可暂存后处理。
  3. 流量削峰

    • 示例:秒杀活动的高并发请求先写入队列,系统按处理能力逐步消费。
    • 优势:避免服务器瞬时过载。
  4. 日志收集

    • 示例:多台服务器将日志发送到队列,由统一服务消费存储。
    • 优势:集中处理,避免日志丢失。

1. 安装 RabbitMQ

在本地安装 RabbitMQ

Linux (Ubuntu)

# 安装 Erlang(RabbitMQ 依赖)
sudo apt-get install -y erlang# 安装 RabbitMQ
sudo apt-get install -y rabbitmq-server# 启动服务
sudo systemctl start rabbitmq-server# 启用管理插件(可选)
sudo rabbitmq-plugins enable rabbitmq_management

MacOS

# 通过 Homebrew 安装
brew install rabbitmq# 启动服务
brew services start rabbitmq# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

Windows

  1. 下载并安装 Erlang。
    在这里插入图片描述

  2. 下载 RabbitMQ 的 Windows 安装包。

    • 安装完成后,勾选“Start RabbitMQ Service”,或之后手动启动rabbtmq服务
      在这里插入图片描述
      在这里插入图片描述
  3. 通过命令行启用管理插件:

    rabbitmq-plugins enable rabbitmq_management
    
配置环境变量

安装RabbitMQ完成后需要配置到系统环境变量,否则直接执行 rabbitmq-plugins enable rabbitmq_management 命令会找不到。

  1. 右键点击“此电脑” → “属性” → “高级系统设置” → “环境变量”。
  2. 在“系统变量”中找到 Path,点击“编辑”。
  3. 添加 RabbitMQ 的 sbin 目录路径(例如):
    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.0\sbin
    
  4. 保存后重新打开命令行,即可直接运行 rabbitmq-plugins

在这里插入图片描述

在这里插入图片描述

如果不配置系统环境变量,也可直接在 rabbitmq-plugins.bat 所在的路径(即sbin)下执行命令。

在这里插入图片描述


2. Python 代码示例

安装 Python 客户端库

pip install pika

pika 是 用于与 RabbitMQ 进行通信的Python 客户端库,构建高效、可靠的消息队列系统。


示例 1:基本队列通信

生产者(producer.py)

import pika# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明一个队列(如果不存在则创建)
channel.queue_declare(queue='hello')# 发送消息
channel.basic_publish(exchange='',          # 使用默认交换机routing_key='hello',  # 指定队列名称body='Hello World!'   # 消息内容
)
print(" [x] Sent 'Hello World!'")# 关闭连接
connection.close()

消费者(consumer.py)

import pikadef callback(ch, method, properties, body):print(f" [x] Received {body}")# 连接到 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明队列(确保存在)
channel.queue_declare(queue='hello')# 消费消息
channel.basic_consume(queue='hello',         # 监听的队列on_message_callback=callback,  # 消息处理函数auto_ack=True         # 自动确认消息
)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 开始阻塞监听

运行测试

  1. 启动消费者:
    python consumer.py
    
  2. 启动生产者:
    python producer.py
    
  3. 消费者会输出:
    [x] Received b'Hello World!'

示例 2:使用交换机(Topic 模式)

生产者(topic_producer.py)

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明一个 Topic 类型的交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')# 发送消息到特定路由键
routing_key = 'user.notification.email'
message = 'Email sent to user!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message
)
print(f" [x] Sent {routing_key}:{message}")
connection.close()

消费者(topic_consumer.py)

import pika
import sysdef callback(ch, method, properties, body):print(f" [x] {method.routing_key}:{body}")connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 声明交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')# 创建临时队列(关闭连接后自动删除)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue# 绑定队列到交换机,监听特定路由键
routing_key = 'user.notification.*'  # 通配符匹配
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=routing_key
)channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True
)print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()

运行测试

  1. 启动消费者(监听 user.notification.*):
    python topic_consumer.py
    
  2. 启动生产者(发送 user.notification.email):
    python topic_producer.py
    
  3. 消费者会收到匹配路由键的消息。

示例 3:消息持久化与手动确认

生产者(持久化消息)

channel.queue_declare(queue='task_queue', durable=True)  # 队列持久化
channel.basic_publish(exchange='',routing_key='task_queue',body='This is a persistent task',properties=pika.BasicProperties(delivery_mode=2,  # 消息持久化)
)

消费者(手动 ACK)

def callback(ch, method, properties, body):print(f" [x] Processing {body}")# 模拟耗时任务import timetime.sleep(5)print(" [x] Done")ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认channel.basic_consume(queue='task_queue',on_message_callback=callback,auto_ack=False  # 关闭自动确认
)

代码结合:

可用把上面的通信代码写成一个项目,把RabbitMQ的部分封装成一个连接类:

https://gitee.com/aiyimu/python/tree/master/RabbitMQ_Practice/RabbitMQ_Practice

3. 关键注意事项

  1. 连接管理
    • 生产环境中使用连接池(如 pika.BlockingConnection 的复用)。
  2. 错误处理
    • 捕获 pika.exceptions.AMQPConnectionError 并实现重连逻辑。
  3. 性能优化
    • 通过 channel.basic_qos(prefetch_count=1) 限制消费者每次只处理一条消息。
  4. 监控
    • 访问 http://localhost:15672(默认账号 guest/guest)查看队列状态。

http://www.lqws.cn/news/538417.html

相关文章:

  • Spring Task定时任务详解与实战应用
  • java中的anyMatch和allMatch方法
  • OSEK/VDX OS ISO17356-3,【1】规范概述
  • SpringBoot项目快速开发框架JeecgBoot——Web处理!
  • linux cp与mv那个更可靠
  • MySQL5.7和8.0 破解root密码
  • 快速傅里叶变换(FFT)是什么?
  • python中学物理实验模拟:斜面受力分析
  • 圆周期性显示和消失——瞬态实现(CAD c#二次开发、插件定制)
  • Nordic nRF54L15 SoC对包含电池监测、中断处理和电源轨控制的定制 nPM1300 示例
  • springcloud 尚硅谷 看到9开头
  • 华为云鸿蒙应用入门级开发者认证 实验(HCCDA-HarmonyOS Cloud Apps)
  • 玄机抽奖Spring Web项目
  • Maven Javadoc 插件使用详解
  • [论文阅读]RaFe: Ranking Feedback Improves Query Rewriting for RAG
  • 解决uniapp vue3版本封装组件后:deep()样式穿透不生效的问题
  • react-嵌套路由 二级路由
  • 事件循环(Event Loop)机制对比:Node.js vs 浏览器​
  • python+requests接口自动化测试
  • 大脑感官:视觉系统中将感观信息转换为神经信号
  • @Autowired 和 @Resource 有什么区别?
  • Java常用设计模式详解
  • linux网络编程socket套接字
  • 【论文阅读】--Instruction Backdoor Attacks Against Customized LLMs
  • Neo4j2.0.1桌面端使用教程(简化版)
  • MySQL 锁的分类
  • WinAppDriver 自动化测试:C#篇
  • EMQ X Broker 配置HTTP 的外部鉴权接口
  • 生物化学 PCR(聚合酶链式反应)引物 制造(固相磷酰胺化学法) 购买 存储
  • 如何在x86_64 Linux上部署Android Cuttlefish模拟器运行环境