rabbitmq动态创建交换机、队列、动态绑定,销毁
// 缓存已创建的绑定,避免重复声明private final Map<String, Date> createdBindings = new ConcurrentHashMap<>();
public void createAndBindQueueToExchange(String type,String clinetId, String routingKey) {String queueName = routingKey;log.info("初始化类型:{}",type);QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);if(queueInformation == null) {//队列不存在则创建Map<String, Object> args = new HashMap<>();args.put("x-max-priority", maxPriority); // 设置优先级范围Queue queue = new Queue(queueName, true, false, false, args);rabbitAdmin.declareQueue(queue);log.info("创建队列: {} ", queueName);}else{log.info("队列已存在: {} ", queueName);}String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;//还要判断监听容器是否存在if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {log.info("绑定已存在缓存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));createdBindings.put(containerKey,new Date());}else{//stopContainerListenerAndCleanCash(containerKey,"缓存 无Key(有无监听容器)或(有缓存Key无监听容器)");// 2. 声明绑定到已存在的交换机Binding binding = new Binding(queueName,Binding.DestinationType.QUEUE,mainDirectExchange,routingKey,null);rabbitAdmin.declareBinding(binding);// 添加到缓存createdBindings.put(containerKey,new Date());log.info("成功创建绑定 for queue: {} to exchange: {} with routing key: {}",queueName, mainDirectExchange, routingKey);}// 3. 注册监听器if (!registry.getListenerContainerIds().contains(containerKey)) {SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();endpoint.setId(containerKey);endpoint.setQueueNames(queueName);//endpoint.setAutoStartup(true);// 使用手动ACK的消息监听器endpoint.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//处理消息完成后,不退回为trueboolean okMessage = false;String consumerTag = "";Exception exception = null;FlowInstanceNode flowInstanceNode = null;try {consumerTag = message.getMessageProperties().getConsumerTag();String clientId = consumerTag.split(":")[3];String messageBody = new String(message.getBody());flowInstanceNode = processMessage(clientId,messageBody);if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {//消费正常channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);okMessage = true;}} catch (Exception e) {log.error("指令包消息处理失败: {}", truncateString(new String(message.getBody())), e);exception = e;}finally {// 根据异常类型决定是否重新入队// 网络异常的情况都应返回队列重新消费boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);log.info("指令执行包异常标志exceptionFlag: {}",exceptionFlag);if (exceptionFlag) {okMessage = false;// 消息重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.warn("指令包消息处理失败,已重新入队: {}", truncateString(new String(message.getBody())));} else {// 消息丢弃(不重新入队)if(!okMessage){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("指令包消息处理失败,已丢弃: {}", truncateString(new String(message.getBody())));}okMessage = true;}if (okMessage) {// 消费消息序列ID写入缓存Long sequence = flowInstanceNode.getSequence();String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();String sequenceKey = professionSoftwareCode+":message:finishedSequence";cacheService.set(sequenceKey, sequence);}String exceptionType = flowInstanceNode.getExceptionType();//网络异常if(exceptionFlag){//调用专业软件API网络异常//停止监听stopContainerListenerAndCleanCash(consumerTag,"网络异常");//数据返回队列}//判断心跳是否超时stopContainer(consumerTag,"每次检查心跳超时");log.info("本次消费执行完毕");}}});registry.registerListenerContainer(endpoint, factory, false);SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return containerKey;}});simpleMessageListenerContainer.start();// 设置监听器容器log.info("设置监听器容器并启动监听 queue: {},containerKey :{}" , queueName,containerKey);}else {// 启动已存在的监听器if (!registry.getListenerContainer(containerKey).isRunning()) {//启动监听registry.getListenerContainer(containerKey).start();log.info("启动监听 queue: {},containerKey:{}" , queueName,containerKey);} else {log.info("监听已运行 queue: {},containerKey:{}" , queueName,containerKey);}}}
动态销毁:
public void stopContainerListenerAndCleanCash(String containerKey,String tip) {try {log.info(tip+",清除缓存,停止监听容器:{}", containerKey);if (registry.getListenerContainerIds().contains(containerKey)) {registry.getListenerContainer(containerKey).stop();registry.unregisterListenerContainer(containerKey);}// 删除队列// rabbitAdmin.deleteQueue(queueName);// 清理该队列相关的绑定缓存createdBindings.remove(containerKey);}catch (Exception e){log.error("清除缓存,停止监听容器异常",e);}}
容器事件监听:
//容器异常
@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@AutowiredRabbitMQService rabbitMQService;@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {Throwable t = event.getThrowable();Object object = event.getSource();if (object instanceof SimpleMessageListenerContainer){SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;String queueName = container.getQueueNames()[0];String listenerId = container.getListenerId();rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器异常");}log.error("RabbitMQ监听容器异常", t);// 这里可以判断异常类型,比如队列不存在、连接断开等if (t instanceof ShutdownSignalException) {// 处理队列被删除、服务失联等}}
}
//
@PostConstructpublic void init() {//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//factory.setReceiveTimeout();//factory.setReceiveTimeout(1L);factory.setContainerCustomizer(container -> {// 设置消费者超时时间(需RabbitMQ服务端支持)container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));});factory.setErrorHandler(t -> {// 这里可以捕获到消息处理时的异常log.error("RabbitMQ消息处理异常", t);// 可以根据异常类型做不同处理});connectionFactory.addConnectionListener(new ConnectionListener() {@Overridepublic void onClose(Connection connection) {stopAllContainerListeners();log.warn("RabbitMQ连接关闭");}@Overridepublic void onCreate(Connection connection) {stopAllContainerListeners();log.info("RabbitMQ连接创建");}@Overridepublic void onShutDown(ShutdownSignalException signal) {stopAllContainerListeners();log.error("RabbitMQ连接异常关闭", signal);}});}