当前位置: 首页 > news >正文

rabbitmq动态创建交换机、队列、动态绑定,销毁

 // 缓存已创建的绑定,避免重复声明private final Map<String, Date> createdBindings = new ConcurrentHashMap<>(); 
public void createAndBindQueueToExchange(String type,String clinetId,  String routingKey) {String   queueName = routingKey;log.info("初始化类型:{}",type);QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);if(queueInformation == null) {//队列不存在则创建Map<String, Object> args = new HashMap<>();args.put("x-max-priority", maxPriority); // 设置优先级范围Queue queue = new Queue(queueName, true, false, false, args);rabbitAdmin.declareQueue(queue);log.info("创建队列: {} ", queueName);}else{log.info("队列已存在: {} ", queueName);}String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;//还要判断监听容器是否存在if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {log.info("绑定已存在缓存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));createdBindings.put(containerKey,new Date());}else{//stopContainerListenerAndCleanCash(containerKey,"缓存 无Key(有无监听容器)或(有缓存Key无监听容器)");// 2. 声明绑定到已存在的交换机Binding binding = new Binding(queueName,Binding.DestinationType.QUEUE,mainDirectExchange,routingKey,null);rabbitAdmin.declareBinding(binding);// 添加到缓存createdBindings.put(containerKey,new Date());log.info("成功创建绑定 for queue: {} to exchange: {} with routing key: {}",queueName, mainDirectExchange, routingKey);}// 3. 注册监听器if (!registry.getListenerContainerIds().contains(containerKey)) {SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();endpoint.setId(containerKey);endpoint.setQueueNames(queueName);//endpoint.setAutoStartup(true);// 使用手动ACK的消息监听器endpoint.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//处理消息完成后,不退回为trueboolean okMessage = false;String consumerTag = "";Exception exception = null;FlowInstanceNode flowInstanceNode = null;try {consumerTag = message.getMessageProperties().getConsumerTag();String clientId = consumerTag.split(":")[3];String messageBody = new String(message.getBody());flowInstanceNode = processMessage(clientId,messageBody);if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {//消费正常channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);okMessage = true;}} catch (Exception e) {log.error("指令包消息处理失败: {}", truncateString(new String(message.getBody())), e);exception = e;}finally {// 根据异常类型决定是否重新入队// 网络异常的情况都应返回队列重新消费boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);log.info("指令执行包异常标志exceptionFlag: {}",exceptionFlag);if (exceptionFlag) {okMessage = false;// 消息重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.warn("指令包消息处理失败,已重新入队: {}", truncateString(new String(message.getBody())));} else {// 消息丢弃(不重新入队)if(!okMessage){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("指令包消息处理失败,已丢弃: {}", truncateString(new String(message.getBody())));}okMessage = true;}if (okMessage) {// 消费消息序列ID写入缓存Long sequence = flowInstanceNode.getSequence();String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();String sequenceKey = professionSoftwareCode+":message:finishedSequence";cacheService.set(sequenceKey, sequence);}String exceptionType =  flowInstanceNode.getExceptionType();//网络异常if(exceptionFlag){//调用专业软件API网络异常//停止监听stopContainerListenerAndCleanCash(consumerTag,"网络异常");//数据返回队列}//判断心跳是否超时stopContainer(consumerTag,"每次检查心跳超时");log.info("本次消费执行完毕");}}});registry.registerListenerContainer(endpoint, factory, false);SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return containerKey;}});simpleMessageListenerContainer.start();// 设置监听器容器log.info("设置监听器容器并启动监听 queue: {},containerKey :{}" , queueName,containerKey);}else {// 启动已存在的监听器if (!registry.getListenerContainer(containerKey).isRunning()) {//启动监听registry.getListenerContainer(containerKey).start();log.info("启动监听  queue: {},containerKey:{}" , queueName,containerKey);} else {log.info("监听已运行 queue: {},containerKey:{}" , queueName,containerKey);}}}

动态销毁:

   public void stopContainerListenerAndCleanCash(String containerKey,String tip) {try {log.info(tip+",清除缓存,停止监听容器:{}", containerKey);if (registry.getListenerContainerIds().contains(containerKey)) {registry.getListenerContainer(containerKey).stop();registry.unregisterListenerContainer(containerKey);}// 删除队列// rabbitAdmin.deleteQueue(queueName);// 清理该队列相关的绑定缓存createdBindings.remove(containerKey);}catch (Exception e){log.error("清除缓存,停止监听容器异常",e);}}

容器事件监听:

//容器异常

@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@AutowiredRabbitMQService rabbitMQService;@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {Throwable t = event.getThrowable();Object object = event.getSource();if (object instanceof SimpleMessageListenerContainer){SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;String queueName = container.getQueueNames()[0];String listenerId = container.getListenerId();rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器异常");}log.error("RabbitMQ监听容器异常", t);// 这里可以判断异常类型,比如队列不存在、连接断开等if (t instanceof ShutdownSignalException) {// 处理队列被删除、服务失联等}}
}

//

    @PostConstructpublic void init() {//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//factory.setReceiveTimeout();//factory.setReceiveTimeout(1L);factory.setContainerCustomizer(container -> {// 设置消费者超时时间(需RabbitMQ服务端支持)container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));});factory.setErrorHandler(t -> {// 这里可以捕获到消息处理时的异常log.error("RabbitMQ消息处理异常", t);// 可以根据异常类型做不同处理});connectionFactory.addConnectionListener(new ConnectionListener() {@Overridepublic void onClose(Connection connection) {stopAllContainerListeners();log.warn("RabbitMQ连接关闭");}@Overridepublic void onCreate(Connection connection) {stopAllContainerListeners();log.info("RabbitMQ连接创建");}@Overridepublic void onShutDown(ShutdownSignalException signal) {stopAllContainerListeners();log.error("RabbitMQ连接异常关闭", signal);}});}

http://www.lqws.cn/news/593803.html

相关文章:

  • 企业上网行为管理:零信任安全产品的对比分析
  • go语言安装达梦数据完整教程
  • 大数据在UI前端的应用创新研究:基于自然语言处理的用户意图识别
  • Spring生态的核心组件演进
  • 21、鸿蒙学习——使用App Linking实现应用间跳转
  • 推客系统小程序终极指南:从0到1构建自动裂变增长引擎,实现业绩10倍增长!
  • 使用tensorflow的线性回归的例子(二)
  • 【第二章:机器学习与神经网络概述】04.回归算法理论与实践 -(2)支持向量回归(SVR)
  • linux魔术字定位踩内存总结
  • GORM 删除的重要特性:软删除实践案例(优化版)
  • 前端计算机视觉:使用 OpenCV.js 在浏览器中实现图像处理
  • React:利用React.memo和useCallback缓存弹窗组件
  • Oracle 常用函数
  • 设置linux静态IP
  • 测试第六讲-测试模型分类
  • RabbitMQ - SpringAMQP及Work模型
  • 信息化项目验收,软件工程评审和检查表单
  • Qt中使用QSettings数据或结构体到INI文件
  • 边缘人工智能与医疗AI融合发展路径:技术融合与应用前景(下)
  • 区块链存证:数字时代的法律盾牌还是技术乌托邦?
  • 数据结构day5——队列和树
  • 县级智慧水务一体化方案及落地案例PPT(39页)
  • 8.Docker镜像讲解
  • 高强螺栓的计算与选用
  • 深入金融与多模态场景实战:金融文档分块技术与案例汇总
  • Qt时间显示按钮功能详解
  • 【docker】unknown shorthand flag: ‘f‘ in -f See ‘docker --help‘.
  • 实变与泛函题解-心得笔记【16】
  • Electron 应用中的内容安全策略 (CSP) 全面指南
  • MySQL索引深度解析:B+树、B树、哈希索引怎么选?