1.服务端
"""
@File: rabbitmq_server.py
@Date: 2025/6/26 10:42
@Author: xxx
@Description:
1. RabbitMQ服务端,支持多节点命令执行
2. 作为被控节点运行,可接收定向命令并返回结果
"""import ssl
import pika
import time
import json
import socket
import logging
import subprocess
import configparser
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('rabbitmq_server.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"class RabbitMQServer:"""RabbitMQ RPC服务器类功能:接收并执行来自客户端的定向命令"""def __init__(self, node_name=None, mq_user="rabbitmq", mq_password="rabbitmq@123",mq_virtual_host="/", mq_host=None, mq_port=5671,mq_ca="/opt/ssl/ca_certificate.pem"):"""初始化RabbitMQ服务端:param node_name: 节点名称标识(唯一):param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径"""self.NODE_NAME = node_name if node_name else socket.gethostname()self.RABBITMQ_USER = mq_userself.RABBITMQ_UNLOCK_CODE = mq_passwordself.RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")self.RABBITMQ_PORT = mq_portself.SSL_CA_PATH = mq_caself._setup_connection()def get_option(self, file_path, section, option):"""获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据"""parser = configparser.ConfigParser()parser.read(file_path)if not parser.has_option(section, option):return ""else:return parser.get(section, option)def _get_ssl_options(self):"""配置SSL安全连接选项"""context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)context.load_verify_locations(self.SSL_CA_PATH)return pika.SSLOptions(context, "localhost" )def _setup_connection(self):"""建立RabbitMQ连接并设置队列"""credentials = pika.PlainCredentials(self.RABBITMQ_USER, self.RABBITMQ_UNLOCK_CODE )connection_params = pika.ConnectionParameters(host=self.RABBITMQ_HOST,port=self.RABBITMQ_PORT,virtual_host=self.RABBITMQ_VIRTUAL_HOST,credentials=credentials,ssl_options=self._get_ssl_options(),heartbeat=600 )self.connection = pika.BlockingConnection(connection_params)self.channel = self.connection.channel()self.channel.queue_declare(queue=self.NODE_NAME, durable=True )self.channel.basic_qos(prefetch_count=1)self.channel.basic_consume(queue=self.NODE_NAME, on_message_callback=self._execute_command, auto_ack=False )def _execute_command(self, ch, method, props, body):"""执行接收到的命令并返回结果"""try:message = json.loads(body.decode('utf-8'))command = message.get('command', '') target = message.get('target', '') logger.info(f" [x] 收到({target})命令:{command}")if target != self.NODE_NAME:logger.warning(f" [x] 收到非本节点({self.NODE_NAME})命令,已忽略")ch.basic_ack(delivery_tag=method.delivery_tag) returnlogger.info(f" [*] 执行命令 【{command}】...")try:output = subprocess.check_output(command,shell=True, stderr=subprocess.STDOUT, timeout=60 )response = output.decode('utf-8')except subprocess.TimeoutExpired:response = "Error: Command timed out"except subprocess.CalledProcessError as e:response = f"Error: {e.output.decode('utf-8')}"except Exception as e:response = f"System Error: {str(e)}"ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id, delivery_mode=2 ),body=response.encode('utf-8') )logger.info(f" [*] 命令执行完成")ch.basic_ack(delivery_tag=method.delivery_tag)except Exception as e:logger.exception(f" [x] 消息处理异常: {str(e)}")ch.basic_nack(delivery_tag=method.delivery_tag)def start(self, max_retries=5, retry_delay=10):"""启动RabbitMQ服务并持续监听消息功能:管理服务生命周期,处理连接异常和重试逻辑:param max_retries: 最大重试次数,默认5次:param retry_delay: 重试间隔时间(秒),默认10秒:return:"""retry_count = 0while True:try:logger.info(f" [*] {self.NODE_NAME} 节点服务启动 (尝试 {retry_count + 1}/{max_retries})")logger.info(f" [*] 等待队列 {self.NODE_NAME} 中的请求...")if not hasattr(self, 'connection') or self.connection.is_closed:self._setup_connection() self.channel.start_consuming()except pika.exceptions.AMQPConnectionError as e:retry_count += 1logger.exception(f"连接失败: {str(e)}")if retry_count >= max_retries:logger.error(" [x] 达到最大重试次数,终止服务")self.close()break logger.warning(f" [*] {retry_delay}秒后尝试重新连接...")time.sleep(retry_delay)except KeyboardInterrupt:logger.error("\n [x] 接收到终止信号")self.close()logger.error(" [x] 服务已停止")break except Exception as e:logger.exception(f"服务异常: {str(e)}")time.sleep(retry_delay) def close(self):"""安全关闭RabbitMQ连接功能:清理资源,确保连接被正确关闭:return:"""if hasattr(self, 'connection') and not self.connection.is_closed:self.connection.close() logger.info(" [x] 连接已安全关闭")if __name__ == '__main__':server = RabbitMQServer()try:server.start()except KeyboardInterrupt:logger.error("\n [x] 接收到终止信号")server.close()logger.error(" [x] 服务已停止")
2.客户端
"""
@File: rabbitmq_client.py
@Date: 2025/6/26 10:43
@Author: xxx
@Description:
1. RabbitMQ客户端类,支持向指定节点发送SSH命令
2. 作为控制端运行,可定向发送命令并接收执行结果
"""import ssl
import pika
import time
import uuid
import json
import socket
import logging
import configparser
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('rabbitmq_client.log', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)04d - %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(console_handler)RABBITMQ_HOST_CONF = "/etc/rabbitmq/rabbitmq.conf"class RabbitMQClient:"""RabbitMQ RPC客户端类功能:向指定节点发送命令并获取执行结果"""def __init__(self, mq_user="rabbitmq", mq_password="rabbitmq@123", mq_virtual_host="/",mq_host=None, mq_port=5671, mq_ca="/opt/ssl/ca_certificate.pem"):"""初始化RabbitMQ客户端:param mq_user: RabbitMQ用户名:param mq_password: RabbitMQ密码:param mq_virtual_host: 虚拟主机:param mq_host: RabbitMQ服务器IP:param mq_port: RabbitMQ服务端口:param mq_ca: SSL证书路径"""self.RABBITMQ_USER = mq_userself.RABBITMQ_UNLOCK_CODE = mq_passwordself.RABBITMQ_VIRTUAL_HOST = mq_virtual_hostself.RABBITMQ_HOST = mq_host if mq_host else self.get_option(RABBITMQ_HOST_CONF, "global", "rabbitmq_host")self.RABBITMQ_PORT = mq_portself.SSL_CA_PATH = mq_caself.response = Noneself.corr_id = Nonelogger.info(" [x] 正在建立连接 ...")self._connect()logger.info(" [x] 连接建立成功")def get_option(self, file_path, section, option):"""获取 file_path 配置项值,若配置文件没有,返回空字符串:param section: section字符串,例如:'global':param option: key值,例如:'manage_nodes':return: 字符串类型数据"""parser = configparser.ConfigParser()parser.read(file_path)if not parser.has_option(section, option):return ""else:return parser.get(section, option)def _connect(self):"""建立RabbitMQ连接并初始化回调队列功能:配置安全连接参数、创建通信信道、设置消息回调处理:return:"""ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)ssl_context.load_verify_locations(self.SSL_CA_PATH)ssl_options = pika.SSLOptions(ssl_context, "localhost")credentials = pika.PlainCredentials(self.RABBITMQ_USER, self.RABBITMQ_UNLOCK_CODE )connection_params = pika.ConnectionParameters(host=self.RABBITMQ_HOST, port=self.RABBITMQ_PORT, virtual_host=self.RABBITMQ_VIRTUAL_HOST, credentials=credentials, ssl_options=ssl_options, heartbeat=60 )self.connection = pika.BlockingConnection(connection_params)self.channel = self.connection.channel()result = self.channel.queue_declare(queue='', exclusive=True )self.callback_queue = result.method.queueself.channel.basic_consume(queue=self.callback_queue, on_message_callback=self._on_response, auto_ack=False )def _on_response(self, ch, method, props, body):"""RPC模式下的响应消息回调处理函数功能:匹配并接收服务端返回的命令执行结果处理逻辑:1.通过correlation_id匹配对应的请求2.将二进制消息体解码为字符串3.存储结果供execute_command方法获取:param ch: (pika.channel.Channel): 接收到消息的信道对象:param method: (pika.spec.Basic.Deliver): 包含投递信息(如delivery_tag):param props: (pika.spec.BasicProperties): 消息属性(含correlation_id等):param body: (bytes): 消息体内容(服务端返回的执行结果):return:"""try:if self.corr_id == props.correlation_id:self.response = body.decode('utf-8')except UnicodeDecodeError as e:self.response = f"解码失败: {str(e)}"def execute_command(self, command, target_node=None, timeout=60):"""向指定RabbitMQ节点发送命令并获取执行结果(RPC模式):param command (str): 要执行的shell命令字符串(如"ls -l"):param target_node (str): 目标节点标识,对应服务端的队列名- 默认None表示发送到当前主机节点:param timeout (int): 等待响应的超时时间(秒),默认60秒:return str: 命令执行结果文本异常:TimeoutError: 超过指定时间未收到响应时抛出AMQP相关异常: 消息发送失败时抛出向指定节点执行远程命令"""self.response = None self.corr_id = str(uuid.uuid4()) if not target_node:target_node = socket.gethostname()message = {"command": command, "target": target_node, "timestamp": time.time() }self.channel.basic_publish(exchange='', routing_key=target_node, properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.corr_id, ),body=json.dumps(message).encode('utf-8'))start_time = time.time()while self.response is None:self.connection.process_data_events()if time.time() - start_time > timeout:raise TimeoutError(f"等待节点 {target_node} 响应超时")time.sleep(0.1)return self.response def close(self):"""安全关闭RabbitMQ连接功能:1. 清理网络连接资源2. 自动删除临时队列(exclusive队列)3. 防止资源泄漏:return:"""if self.connection and not self.connection.is_closed:self.connection.close()logger.warning(" [x] 连接已关闭")if __name__ == '__main__':client = RabbitMQClient()try:nodes = ["node247", "node248", "node249"]for node in nodes:try:logger.info(f"\n向节点 {node} 执行命令: hostname")logger.info(client.execute_command(command="hostname", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")try:logger.info(f"\n向节点 {node} 执行命令: ls -l /opt/")logger.info(client.execute_command(command="ls -l /opt/", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")try:logger.info(f"\n向节点 {node} 执行命令: date")logger.info(client.execute_command(command="date", target_node=node))except Exception as e:logger.exception(f"节点 {node} 执行失败: {str(e)}")finally:client.close()
3.调用结果
192.168.120.17 node17
192.168.120.18 node18
192.168.120.19 node19python3 rabbitmq_server.py
192.168.120.17 node17python3 rabbitmq_client.py