两个python独立进程通信
在 Python 3 中,两个独立的 Python 文件(进程)可以通过 multiprocessing.Queue
进行通信,但需要注意 跨进程队列的实现方式。以下是具体方法和注意事项:
场景 | 解决方案 | 优缺点 |
---|---|---|
父子进程 | multiprocessing.Queue | 高效,但需通过父进程传递队列。 |
独立进程(同主机) | multiprocessing.Manager().Queue() | 简单,但性能较低,需保持管理器运行。 |
跨机器/高并发 | Redis/RabbitMQ | 高性能,但需额外部署中间件。 |
选择建议
-
如果两个文件由同一脚本启动 → 方法 1
-
如果需要分布式通信 → 方法 3
方法 1:父子进程共享队列
在 Python 3 中,可以通过一个 父进程 启动两个子进程(对应两个 Python 文件),并共享同一个 multiprocessing.Queue
,实现进程间通信。以下是具体实现方法:
1.1 直接使用 multiprocessing.Process
启动子进程
步骤
-
父进程 创建一个
multiprocessing.Queue()
。 -
父进程 启动两个子进程(
producer.py
和consumer.py
),并将队列作为参数传递。 -
子进程 通过队列进行通信。
代码示例
父进程 (main.py
)
import multiprocessing
import os
from producer import producer # 导入 producer 函数
from consumer import consumer # 导入 consumer 函数if __name__ == "__main__":# 1. 创建共享队列queue = multiprocessing.Queue()# 2. 启动生产者进程p_producer = multiprocessing.Process(target=producer,args=(queue,))p_producer.start()# 3. 启动消费者进程p_consumer = multiprocessing.Process(target=consumer,args=(queue,))p_consumer.start()# 4. 等待子进程结束p_producer.join()p_consumer.join()print("All processes finished.")
生产者 (producer.py
)
import timedef producer(queue):for i in range(5):queue.put(f"Message {i}") # 发送数据print(f"Produced: Message {i}")time.sleep(1)queue.put(None) # 发送结束信号
消费者 (consumer.py
)
def consumer(queue):while True:item = queue.get() # 接收数据if item is None: # 检查结束信号breakprint(f"Consumed: {item}")
1.2 使用 import
+ if __name__ == "__main__"
直接运行
如果不想用 main.py
,可以直接在 生产者 或 消费者 文件中启动另一个进程:
修改 producer.py
以启动 consumer.py
import multiprocessing import time from consumer import consumer # 导入 consumer 函数def producer(queue):for i in range(5):queue.put(f"Message {i}")print(f"Produced: Message {i}")time.sleep(1)queue.put(None) # 发送结束信号if __name__ == "__main__":queue = multiprocessing.Queue()# 启动消费者进程p_consumer = multiprocessing.Process(target=consumer,args=(queue,))p_consumer.start()# 运行生产者(当前进程)producer(queue)# 等待消费者结束p_consumer.join()print("All done.")
关键注意事项
-
if __name__ == "__main__"
是必需的: 避免 Windows 系统下的multiprocessing
问题(子进程重新执行代码)。 -
队列传递方式:
multiprocessing.Queue
必须由 父进程创建,然后传递给子进程。 -
终止信号: 通常用
None
或特殊标记(如"STOP"
)通知消费者终止。 -
避免死锁: 如果队列满,
queue = multiprocessing.Queue(maxsize=10) # 限制队列大小queue.put()
会阻塞,可以设置maxsize
: -
跨文件导入: 确保
producer.py
和consumer.py
在 同一目录,或正确设置PYTHONPATH
。
方法 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
父进程 (main.py ) | 需要清晰控制多个子进程 | 结构清晰,易于扩展 | 需要额外的主控文件 |
单文件启动 | 快速测试生产者-消费者模型 | 代码更紧凑 | 逻辑耦合,不易维护 |
推荐 方法 1(main.py
方式),适用于正式项目,便于管理和扩展。
1.3 错误示例
文件 1: producer.py
(生产者)
import multiprocessing
import timedef producer(queue):for i in range(5):queue.put(f"Message {i}")time.sleep(1)if __name__ == "__main__":queue = multiprocessing.Queue()p = multiprocessing.Process(target=producer, args=(queue,))p.start()p.join() # 等待子进程结束
文件 2: consumer.py
(消费者)
import multiprocessingdef consumer(queue):while True:item = queue.get()if item is None: # 终止信号breakprint("Received:", item)if __name__ == "__main__":# 注意:队列必须通过父进程传递,不能直接重新创建queue = multiprocessing.Queue() # 错误!这是另一个独立队列# 正确做法:通过父进程启动并传递队列(见下文)
关键问题
-
直接分别在两个文件中创建
multiprocessing.Queue()
是无效的,因为它们是两个独立的进程,队列不共享。 -
正确方式:需要通过一个 父进程 启动这两个文件,并将队列作为参数传递。
方法 2:使用第三方工具
如果需要高性能或跨机器通信,建议用外部消息队列:
-
Redis:
redis-py
的r = redis.Redis(); r.lpush/r.brpop
-
RabbitMQ:
pika
库实现 AMQP 协议。
Redis生产者
import redis
r = redis.Redis()
r.lpush("my_queue", "Hello from Producer!")
Redis消费者
import redis
r = redis.Redis()
while True:msg = r.brpop("my_queue")print("Received:", msg[1].decode())