当前位置: 首页 > news >正文

07 Springboot+netty+mqtt服务端实现【重构】

文章目录

  • 1 代码结构
  • 2 相关代码
    • 2.1 公共代码
    • 2.2 Mqtt相关代码
      • 2.2.1 公共代码
      • 2.2.2 主流程代码
      • 2.2.3 限流相关
      • 2.2.4 处理器
      • 2.2.5 消息重发

1 代码结构

在这里插入图片描述

2 相关代码

2.1 公共代码

pom.xm

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wyh</groupId><artifactId>Demo_04_1_SpringbootMqttService</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.6</version><relativePath/> <!-- lookup parent from repository --></parent><dependencies><!--  Spring相关依赖【开始】--><!--  Spring相关依赖【开始】--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--  Spring相关依赖【结束】--><!-- 添加日志依赖【开始】 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- 添加日志依赖【结束】 --><!-- 添加工具类【开始】 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><!-- 添加工具类【结束】 --><!-- netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency></dependencies><build><!-- 解决application.yml配置文件中@project.version@获取不到问题 --><resources><resource><directory>${basedir}/src/main/resources</directory><filtering>true</filtering><includes><include>application.yml</include><include>**/*.xml</include><include>**/*.properties</include></includes></resource></resources><!-- 解决打成的jar运行报“找不见注类”问题 --><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><mainClass>com.wyh.demo041springbootmqttservice.Demo041SpringbootMqttService</mainClass></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

application.yml

# 指定执行环境
spring:servlet:multipart:# 上传文件总的最大值max-file-size: 20MB# 单个文件的最大值max-request-size: 100MB# 服务配置
server:tomcat:uri-encoding: UTF-8max-threads: 1000min-spare-threads: 30# 重新设置tomcat服务器允许上传的文件的大小【默认2MB】,解决MultipartException无法捕获的问题max-swallow-size: -1port: 15041servlet:context-path: /Demo_04_1_SpringbootMqttServicecompression:enabled: truemime-types: application/jsonmin-response-size: 8192# 日志配置,位置自己指定
logging:config: classpath:logback-spring.xml# mqtt相关
mqtt:server:socket_port: 1883# 最大连接数,暂不支持修改max_connect_num: 100# TCP默认超时时间【秒】tcp_time_out: 180# 消息重发的间隔时间【毫秒】msg_resend_wait_time: 60000# 单个客户端单条消息大小【byte】single_msg_max_byte: 51200# 单个客户端每分钟消息条数per_minute_msg_number: 2400# 订阅主题后,是否立即接收缓存的消息,默认值false, 暂不支持修改is_immediately_receive_msg: false# 设备管理相关配置
device:max_num: 100

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration><!-- 定义日志文件 输入位置 --><property name="log_dir" value="D:/00 test"/><!-- 日志最大的历史 6 个月  --><property name="maxHistory" value="6"/><appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{dd-MM-yyyy HH:mm:ss.SSS} %magenta([%thread]) %highlight(%-5level) %logger{36}.%M - %msg%n</pattern></encoder><target>System.out</target></appender><appender name="SIZEROLLFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><encoder><pattern>%d{dd-MM-yyyy HH:mm:ss.SSS} %magenta([%thread]) %highlight(%-5level) %logger{36}.%M - %msg%n</pattern></encoder><append>true</append><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><!-- 上环境时修改此路径 --><fileNamePattern>${log_dir}/springboot.%d{yyyy-MM-dd}.%i.log</fileNamePattern><maxFileSize>10MB</maxFileSize><maxHistory>${maxHistory}</maxHistory></rollingPolicy></appender><root level="INFO"><appender-ref ref="CONSOLE"/><!-- 输出到指定目录下--><!--<appender-ref ref="SIZEROLLFILE"/>--></root><!-- 修改dao层的日志输出级别 --><logger name="com.huawei.dao" level="DEBUG" additivity="false"><appender-ref ref="CONSOLE"/></logger></configuration>

Demo041SpringbootMqttService.java

@SpringBootApplication
public class Demo041SpringbootMqttService {public static void main(String[] args) {SpringApplication.run(Demo041SpringbootMqttService.class, args);}
}

2.2 Mqtt相关代码

2.2.1 公共代码

MqttServerConfig.java

/*** 包含Mqtt相关的一下配置,及其需要交互的domain层对象** @author w30009430* @since 2025 -06-11 9:09*/
@Slf4j
@Data
@Configuration
public class MqttServerConfig {@Value("${mqtt.server.socket_port}")private int socketPort;@Value("${mqtt.server.max_connect_num}")private int maxConnectNum;@Value("${mqtt.server.tcp_time_out}")private int tcpTimeOut;@Value("${mqtt.server.msg_resend_wait_time}")private int msgResendWaitTime;@Value("${mqtt.server.single_msg_max_byte}")private int singleMsgMaxByte;@Value("${mqtt.server.per_minute_msg_number}")private int perMinuteMsgNumber;@Value("${mqtt.server.is_immediately_receive_msg}")private boolean isImmediatelyReceiveMsg;@Autowiredprivate DeviceDomain deviceDomain;/*** Init.*/@PostConstructpublic void init() {if (msgResendWaitTime < 0) {log.error("msgResendWaitTime should equal or bigger than 0");throw new IllegalArgumentException("msgResendWaitTime should equal or bigger than 0");}MqttUtils.setWaitTime(msgResendWaitTime);}}

MqttConstants.java

/*** @author w30009430* @since 2025-06-11 15:13*/
public interface MqttConstants {/*** The Client id key.*/AttributeKey<String> CLIENT_ID_KEY = AttributeKey.valueOf("clientId");/*** The constant SHARE_TOPIC_SIGN.*/CharSequence SHARE_TOPIC_SIGN = "$share";/*** 存放成功连接的客户端信息 【clientId:Client】*/ConcurrentHashMap<String, ChannelHandlerContext> CLIENTID_CLIENT_MAP= new ConcurrentHashMap<String, ChannelHandlerContext>();/*** 功能描述:存放客户端和其所订阅的主题集合,用来在客户端断开的时候删除订阅中的客户端 【clientId : Set<topic>】*/ConcurrentHashMap<String, CopyOnWriteArraySet<String>> CLIENTID_TOPICSET_MAP= new ConcurrentHashMap<String, CopyOnWriteArraySet<String>>();/*** 功能描述:存放主题和其订阅的客户端集合 【topic : Set<clientId>】*/ConcurrentHashMap<String, CopyOnWriteArraySet<String>> TOPIC_CLIENTSET_MAP= new ConcurrentHashMap<String, CopyOnWriteArraySet<String>>();/*** 功能描述:存放订阅时的服务质量等级,只有发送小于或等于这个服务质量的消息给订阅者 【topic_client : Qos】*/ConcurrentHashMap<String, MqttQoS> TOPIC_CLIENTID_AND_QOS_MAP = new ConcurrentHashMap<String, MqttQoS>();/*** 功能描述:存放需要缓存的消息,方便再次发送给新订阅的客户端 【topic, Msg】*/ConcurrentHashMap<String, MqttPublishMessage> CACHE_RETAINED_MESSAGES_TOPIC_MSG_MAP= new ConcurrentHashMap<String, MqttPublishMessage>();/*** 功能描述:缓存需要重复发送的消息,以便在收到ack的时候将消息内存释放掉 【clientId_msgId: ByteBuf】*/ConcurrentHashMap<String, ByteBuf> CACHE_REPEAT_MESSAGES_CLIENTIDMSGID_BYTEBUF_MAP= new ConcurrentHashMap<String, ByteBuf>();}

MqttUtils.java

/*** The type Mqtt utils.** @author w30009430* @since 2025 -06-10 11:22*/
@Slf4j
public class MqttUtils {private static volatile long waitTime;// 记录消息id的变量,id值范围1~65534private static final AtomicInteger nextMessageId = new AtomicInteger(1);/*** Sets wait time.** @param waitTime the wait time*/public static void setWaitTime(long waitTime) {MqttUtils.waitTime = waitTime;}/*** Gets new message id.** @return the new message id*/public static int getNewMessageId() {nextMessageId.compareAndSet(0xffff, 1);return nextMessageId.getAndIncrement();}/*** Gets client id.** @param ctx the ctx* @return the client id*/public static String getClientId(ChannelHandlerContext ctx) {return ctx.channel().attr(MqttConstants.CLIENT_ID_KEY).get();}/*** 一条消息可能给多个订阅者发送,因此重发消息的唯一key为【clientId+msgId】** @param clientId the client id* @param messageId the message id* @return the repeat msg key*/public static String getRepeatMsgKey(String clientId, int messageId) {return clientId + "_" + messageId;}/*** MqttService进行消息发送时(PUBREL应答/publish转发),如果客户端1min没有应答,默认不会再应答。默认本次消息发送失败,会定时1min重新发送,* 直到正常响应或者客户端断开后,会终止定时重新发送。* <p>* 缓存的publis消息是一个全新的消息【msgId和最初的msgId不一样】,缓存的消息,只要** @param msgKey the msg key* @param payload the payload* @param pushPublishMsg the push publish msg* @param ctx the ctx*/public static void cachePublishMsg(String msgKey, ByteBuf payload, MqttPublishMessage pushPublishMsg,ChannelHandlerContext ctx) {// 1 创建一个新的缓冲区payload.retainedDuplicate();// 2 防止内存溢出,最后在消息被ack或者客户端断开掉线的时候,拿到并进行释放MqttConstants.CACHE_REPEAT_MESSAGES_CLIENTIDMSGID_BYTEBUF_MAP.put(msgKey, payload);// 3 创建需要重发的消息MqttFixedHeader mqttFixedHeaderInfo = pushPublishMsg.fixedHeader();MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, mqttFixedHeaderInfo.qosLevel(),false, mqttFixedHeaderInfo.remainingLength());MqttPublishMessage cachePubMessage = new MqttPublishMessage(fixedHeader, pushPublishMsg.variableHeader(),payload);// 4 缓存一份消息,规定时间内没有收到ack,用作重发,重发时将isDup设置为true,代表重复消息ScheduledFuture<?> scheduledFuture = ResendMsgThread.SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new ResendMsgTask(msgKey, cachePubMessage, ctx), waitTime, waitTime, TimeUnit.MILLISECONDS);ResendMsgThread.MSGKEY_SCHEDULEDFUTURE_MAP.put(msgKey, scheduledFuture);}/*** Cache pubrel msg.** @param messageId the message id* @param ctx the ctx*/public static void cachePubrelMsg(int messageId, ChannelHandlerContext ctx) {// 1 获取重发消息的msgKeyString msgKey = MqttUtils.getRepeatMsgKey(MqttUtils.getClientId(ctx), messageId);// 2 创建重发的消息// 构建返回报文,固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false,0);// 构建返回报文,可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);// 3 缓存一份消息,规定时间内没有收到ack,用作重发,重发时将isDup设置为true,代表重复消息ScheduledFuture<?> scheduledFuture = ResendMsgThread.SCHEDULED_THREAD_POOL_EXECUTOR.scheduleAtFixedRate(new ResendMsgTask(msgKey, mqttMessageBack, ctx), waitTime, waitTime, TimeUnit.MILLISECONDS);ResendMsgThread.MSGKEY_SCHEDULEDFUTURE_MAP.put(msgKey, scheduledFuture);}/*** Del session cache.** @param clientId the clientId*/public static void delSessionCache(String clientId) {if (!MqttConstants.CLIENTID_CLIENT_MAP.containsKey(clientId)) {// 不包含客户端信息,说明已经被清理过了return;}// 1 获取客户标识log.info("del client device cache start. ClientId: {}", clientId);try {// 2 清除客户连接信息MqttConstants.CLIENTID_CLIENT_MAP.remove(clientId);// 3 清理客户端topic相关信息Set<String> topicList = MqttConstants.CLIENTID_TOPICSET_MAP.get(clientId);if (topicList != null) {delTopicInfo(clientId, topicList);}log.info("del client device cache finish. ClientId: {}", clientId);} catch (Exception exp) {log.error("del client device cache error. ClientId: {}", clientId, exp);}}private static void delTopicInfo(String clientId, Set<String> topicList) {for (String topic : topicList) {// 3 清除此客户端订阅的topic信息CopyOnWriteArraySet<String> topicClientIdList = MqttConstants.TOPIC_CLIENTSET_MAP.get(topic);if (topicClientIdList != null) {topicClientIdList.remove(clientId);if (topicClientIdList.isEmpty()) {MqttConstants.TOPIC_CLIENTSET_MAP.remove(topic);}}// 4 清除此客户端topic对应的qos信息MqttConstants.TOPIC_CLIENTID_AND_QOS_MAP.remove(topic + "_" + clientId);}// 5 清理掉给此客户重发的消息delToClientResendMsgInfo(clientId);// 6 清除topic对应的客户端信息MqttConstants.CLIENTID_TOPICSET_MAP.remove(clientId);}private static void delToClientResendMsgInfo(String clientId) {// 停止对此客户端的重发任务for (String msgKey : ResendMsgThread.MSGKEY_SCHEDULEDFUTURE_MAP.keySet()) {if (msgKey.startsWith(clientId)) {stopResendMsgThreadTask(msgKey);}}// 释放给此客户端重发消息的缓存for (String msgKey : MqttConstants.CACHE_REPEAT_MESSAGES_CLIENTIDMSGID_BYTEBUF_MAP.keySet()) {if (msgKey.startsWith(clientId)) {releaseResendMsgByteBuf(msgKey);}}}/*** Stop resend msg thread task.** @param msgKey the msg key*/public static void stopResendMsgThreadTask(String msgKey) {// 5.1 停止重发消息的定时任务ScheduledFuture<?> scheduledFuture = ResendMsgThread.MSGKEY_SCHEDULEDFUTURE_MAP.remove(msgKey);if (scheduledFuture != null) {scheduledFuture.cancel(true);}}/*** Release resend msg byte buf.** @param msgKey the msg key*/public static void releaseResendMsgByteBuf(String msgKey) {// 5.2 释放重发消息的内存ByteBuf removeByteBuf = MqttConstants.CACHE_REPEAT_MESSAGES_CLIENTIDMSGID_BYTEBUF_MAP.remove(msgKey);if (removeByteBuf != null) {removeByteBuf.release();}}/*** Is share topic boolean.** @param topicName the topic name* @return the boolean*/public static boolean isShareTopic(String topicName) {if (StringUtils.isNotEmpty(topicName)) {return topicName.contains(MqttConstants.SHARE_TOPIC_SIGN);}return false;}
}

NettyUtils.java

/*** Netty工具类** @since 2023/7/22*/
@Slf4j
public class NettyUtils {private static final WriteFailedListener<? extends Future<? super Void>> WRITE_FAILED_LISTENER= new WriteFailedListener<>();private NettyUtils() {throw new UnsupportedOperationException();}/*** 写netty消息,mqtt消息** @param context 上下文* @param message 消息* @return ChannelFuture*/public static ChannelFuture write(ChannelHandlerContext context, MqttMessage message) {if (!context.channel().isWritable()) {log.error("Channel is not writable, ClientId: {}, channel: {}", context.channel());}if (message.fixedHeader().messageType() != MqttMessageType.PINGRESP) {log.info("[WriteMsg]. clientId: {}, Msg: {}", MqttUtils.getClientId(context), message);}return context.writeAndFlush(message).addListener(WRITE_FAILED_LISTENER);}/*** 关闭连接** @param context 上下文*/public static void close(ChannelHandlerContext context) {log.info("Close context, {}, {}", context.channel(), Arrays.toString(Thread.currentThread().getStackTrace()));context.close();}/*** 写消息失败监听器** @param <F> future类型*/private static class WriteFailedListener<F extends Future<? super Void>> implements GenericFutureListener<F> {@Overridepublic void operationComplete(F future) {if (!future.isSuccess()) {log.error("Message write failed. cause: {}", future.cause());}}}
}

2.2.2 主流程代码

MqttServer.java

/*** netty启动类,启动mqtt broker** @author w30009430* @since 2025-06-10 8:26*/
@Slf4j
@Component
public class MqttServer implements ApplicationListener<ContextRefreshedEvent> {private final ExecutorService executorService = Executors.newSingleThreadExecutor();@Autowiredprivate MqttServerConfig mqttServerConfig;private NioEventLoopGroup bossGroup;private NioEventLoopGroup workerGroup;private ChannelFuture future;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext().getParent() == null) {executorService.submit(() -> {// 启动netty服务【Mqtt broker】log.info("Start the launch Mqtt Broker.");startMqttBroker();});}}private void startMqttBroker() {// 创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义)bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();try {// 创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组)ServerBootstrap socketBs = new ServerBootstrap();// channel 方法用于指定服务器端监听套接字通道// socket配置socketBs.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,// 函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,// 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,// 多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小.option(ChannelOption.SO_BACKLOG, Constants.Num.NUM_1024)// 快速复用,防止服务端重启端口被占用的情况发生.option(ChannelOption.SO_REUSEADDR, true)// 在新连接时初始化 mqttServerChannelInitializer 【当客户端发送connect时服务端会调用这个类里的initChannel方法初始化客户端和服务端建立的管道】.childHandler(new MqttServerChannelInitializer(mqttServerConfig))// 如果TCP_NODELAY没有设置为true,那么底层的TCP为了能减少交互次数,会将网络数据积累到一定的数量后,// 服务器端才发送出去,会造成一定的延迟。在互联网应用中,通常希望服务是低延迟的,建议将TCP_NODELAY设置为true.childOption(ChannelOption.TCP_NODELAY, true)// 启用TCP keep-alive。默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。.childOption(ChannelOption.SO_KEEPALIVE, true);future = socketBs.bind(mqttServerConfig.getSocketPort()).sync();if (future.isSuccess()) {log.info("Mqtt Broker Start successfully !");future.channel().closeFuture().sync();} else {log.error("Mqtt Broker Startup failed !");}} catch (Exception exp) {log.error("Mqtt Broker Startup exception !", exp);} finally {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();}}/*** 这里再次手动关闭,同时在整个springboot应用停止的时候,这里也得以调用【@PreDestroy】*/@PreDestroypublic void shutdown() {// 优雅关闭两个 EventLoopGroup 对象bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();log.info("Mqtt Broker Closed successfully!");}
}

MqttServerChannelInitializer.java

/*** The type Mqtt server channel initializer.** @author w30009430* @since 2025 -06-10 9:00*/
public class MqttServerChannelInitializer extends ChannelInitializer<SocketChannel> {private final ConnectionLimitHandler connectionLimitHandler;private final PublishRateLimitHandler publishRateLimitHandler;private final MqttServerHandler mqttServerHandler;public MqttServerChannelInitializer(MqttServerConfig mqttServerConfig) {super();this.connectionLimitHandler = new ConnectionLimitHandler(mqttServerConfig);this.publishRateLimitHandler = new PublishRateLimitHandler(mqttServerConfig);this.mqttServerHandler = new MqttServerHandler(mqttServerConfig);}/*** 当客户端发送connect时会调用这个方法,初始化客户端和服务端建立的管道* * @param socketChannel 数据传输通道*/@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new MqttDecoder(Constants.Num.NUM_1024 * Constants.Num.NUM_1024));pipeline.addLast(MqttEncoder.INSTANCE);pipeline.addLast(connectionLimitHandler);pipeline.addLast(publishRateLimitHandler);pipeline.addLast(mqttServerHandler);}
}

MqttServerHandler.java

/*** 处理接收到的消息,【里面可以处理业务数据】** @author w30009430* @since 2025 -05-21 15:25*/
@Slf4j
@ChannelHandler.Sharable
public class MqttServerHandler extends SimpleChannelInboundHandler<MqttMessage> {// 记录消息id的变量,id值范围1~65535private static final AtomicInteger nextMessageId = new AtomicInteger(1);private final MqttMessageMultiplexer mqttMessageMultiplexer;/*** Instantiates a new Server mqtt handler.** @param mqttServerConfig the mqtt server config*/public MqttServerHandler(MqttServerConfig mqttServerConfig) {mqttMessageMultiplexer = new MqttMessageMultiplexer(mqttServerConfig);}/*** 有客户端发消息会触发此函数** @param ctx ctx* @param msg msg*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {mqttMessageMultiplexer.handleMsg(ctx, msg);}/*** 出站/入站捕获到异常时触发** @param ctx ctx* @param cause 异常*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("An exception occurred when processing messages.clientId: {}", MqttUtils.getClientId(ctx), cause);}/*** 时间处理,心跳检测,空闲状态时断开连接** @param ctx 这里的作用主要是解决断网,弱网的情况发生* @param evt 捕获到异常时触发*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.error("Long idle connections. disconnect the client. ClientId: {}, socket: {}",MqttUtils.getClientId(ctx), ctx);// 断开连接,会触发channelInactive方法执行,此方法里进行了会话清理工作ctx.disconnect();}}}/*** 客户端终止连接服务器会触发此函数** @param ctx ctx*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.error("Client Disconnection,{}", ctx);String clientId = MqttUtils.getClientId(ctx);if (StringUtils.isNotEmpty(clientId)) {MqttUtils.delSessionCache(clientId);}super.channelInactive(ctx);}}

2.2.3 限流相关

ConnectionLimitHandler.java

/*** 连接数限流处理器** @author w30009430* @since 2025-06-10 8:26*/
@Slf4j
@ChannelHandler.Sharable
public class ConnectionLimitHandler extends ChannelInboundHandlerAdapter {private final int maxConnectNum;private static final AtomicInteger connectionCount = new AtomicInteger();public ConnectionLimitHandler(MqttServerConfig mqttServerConfig) {maxConnectNum = mqttServerConfig.getMaxConnectNum();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof MqttMessage)) {return;}// 接收到mqtt消息,所有的消息在入口处打印MqttMessage message = (MqttMessage) msg;if (!message.decoderResult().isSuccess()) {log.error("Decoder error, decoder result: {}", message.decoderResult());return;}if (message.fixedHeader().messageType() != MqttMessageType.PINGREQ) {log.info("[ReadMsg]. clientId: {}, Msg: {} ", MqttUtils.getClientId(ctx), message);}// 处理CONNECT消息,if (message.fixedHeader().messageType() == MqttMessageType.CONNECT) {// 如果超过限制数,连接认证失败if (connectionCount.incrementAndGet() > maxConnectNum) {MqttConnAckMessage connAckMessage = ConnectProcessor.createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] The number of connections exceeds the limit {}, close the connection.",maxConnectNum);return;}}ctx.fireChannelRead(msg);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {// 触发关闭事件会自动减计数connectionCount.decrementAndGet();ctx.fireChannelInactive();}
}

PublishRateLimitHandler.java

/*** The type Mqtt publish rate limit handler.** @author w30009430* @since 2025 -06-10 8:26*/
@Slf4j
@ChannelHandler.Sharable
public class PublishRateLimitHandler extends ChannelInboundHandlerAdapter {private final MqttServerConfig mqttServerConfig;// 客户端限流统计public static final ConcurrentHashMap<String, ClientPublishRateData> CLIENT_PUBLISH_MSG_RATE_DATA_MAP = new ConcurrentHashMap<>();public PublishRateLimitHandler(MqttServerConfig mqttServerConfig) {this.mqttServerConfig = mqttServerConfig;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {MqttMessage message = (MqttMessage) msg;// 只对PUBLISH消息进行限流if (message.fixedHeader().messageType() != MqttMessageType.PUBLISH) {ctx.fireChannelRead(msg);return;}// 获取客户端限流数据String clientId = MqttUtils.getClientId(ctx);if (clientId == null) {log.error("ClientId is null, close the connection.");NettyUtils.close(ctx);return;}// 获取publish消息速率计数器ClientPublishRateData rateData = CLIENT_PUBLISH_MSG_RATE_DATA_MAP.get(clientId);if (rateData == null) {log.error("Client publish rate data is null, close the connection. clientId: {}", clientId);NettyUtils.close(ctx);return;}MqttPublishMessage publishMsg = (MqttPublishMessage) message;// 进行publish消息大小if (!checkMsgSize(ctx, clientId, publishMsg)) {return;}// 检查PUBLISH消息速率if (!checkMsgRate(ctx, clientId, rateData)) {return;}ctx.fireChannelRead(msg);}private boolean checkMsgSize(ChannelHandlerContext ctx, String clientId, MqttPublishMessage publishMsg) {// 检查PUBLISH消息大小int messageSize = publishMsg.payload().readableBytes();if (messageSize > mqttServerConfig.getSingleMsgMaxByte()) {log.error("The size of single PUBLISH message is too big, clientId: {}, messageSize: {}, maxSize: {}",clientId, messageSize, mqttServerConfig.getSingleMsgMaxByte());NettyUtils.close(ctx);return false;} else {return true;}}private boolean checkMsgRate(ChannelHandlerContext ctx, String clientId, ClientPublishRateData rateData) {long currentMinute = System.currentTimeMillis() / (60 * 1000);if (rateData.currentMinute.get() != currentMinute) {// 不是同一分钟,重置计数器rateData.currentMinute.set(currentMinute);rateData.publishCount.set(0);}// 检查PUBLISH消息数量限制if (rateData.publishCount.incrementAndGet() > mqttServerConfig.getPerMinuteMsgNumber()) {log.error("The number of PUBLISH messages in one minute exceeds the limit, clientId: {}, currentCount: {}, maxCount: {}",clientId, rateData.publishCount.get(), mqttServerConfig.getPerMinuteMsgNumber());NettyUtils.close(ctx);return false;} else {return true;}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 断开连接时,清理客户端限流数据String clientId = MqttUtils.getClientId(ctx);if (clientId != null) {CLIENT_PUBLISH_MSG_RATE_DATA_MAP.remove(clientId);}super.channelInactive(ctx);}
}

ClientPublishRateData.java

/*** The type Client publish rate data.** @author w30009430* @since 2025 -06-12 16:29*/
public class ClientPublishRateData {/*** The Current minute.*/AtomicLong currentMinute = new AtomicLong(System.currentTimeMillis() / (60 * 1000));/*** The Publish count.*/AtomicInteger publishCount = new AtomicInteger(0);
}

2.2.4 处理器

MqttMessageMultiplexer.java

/** @author: wangyuanhang* @description: 对接收到的消息进行业务处理* @create: 2025-05-21 17:45* CONNECT	    1	    C->S	客户端请求与服务端建立连接 (服务端接收)* CONNACK	    2	    S->C	服务端确认连接建立(客户端接收)* PUBLISH	    3	    CóS	    发布消息 (服务端接收【QoS(服务质量等级) 0级别,最多分发一次】)-->生产者只会发送一次消息,不关心消息是否被代理服务端或消费者收到* PUBACK	    4	    CóS	    收到发布消息确认(客户端接收【QoS 1级别,至少分发一次】) -->保证消息发送到服务端(也就是代理服务器broker),如果没收到或一定时间没收到服务端的ack,就会重发消息* PUBREC	    5	    CóS	    收到发布消息(客户端接收【QoS 2级别】)|* PUBREL	    6	    CóS	    释放发布消息(服务端接收【QoS 2级别】)|只分发一次消息,且保证到达 -->这三步保证消息有且仅有一次传递给消费者* PUBCOMP	    7	    CóS	    完成发布消息(客户端接收【QoS 2级别】)|* SUBSCRIBE	8	    C->S	订阅请求(服务端接收)* SUBACK	    9	    S->C	订阅确认(客户端接收)* UNSUBSCRIBE	10	    C->S	取消订阅(服务端接收)* UNSUBACK	    11	    S->C	取消订阅确认(客户端接收)* PINGREQ	    12	    C->S	客户端发送PING(连接保活)命令(服务端接收)* PINGRESP	    13	    S->C	PING命令回复(客户端接收)* DISCONNECT	14	    C->S	断开连接 (服务端接收)* <p>* 注意:在我们发送消息的时候,一定要确认好等级,回复确认的消息统一设置为qos=0;* 比如,我们需要发送最高等级的消息就将qos设置为2,当我们接收方收到这个等级的消息的时候判断一下等级,设置好消息类型,然后qos因为是回复消息,* 所以全部设置成0,而当发送端收到消息之后,比如PUBREC,那么我们还将发送(注意不是回复)PUBREL给接收端,希望对方回复确认一下,所以qos设置为1。* 综上可知:发送端没有回复确认消息之说,只有发送消息,接收端没有发送消息之说,只有回复确认消息,搞清楚这个概念,在设置参数的时候就明了。**/
/*** The type Mqtt message multiplexer.** @author w30009430* @since 2025 -06-10 14:41*/
@Slf4j
public class MqttMessageMultiplexer {/*** 所有的协议报文处理器*/private final Map<MqttMessageType, MqttMessageProcessor> processors = new HashMap<>();/*** Instantiates a new Mqtt message multiplexer.*/public MqttMessageMultiplexer(MqttServerConfig mqttConfig) {processors.put(MqttMessageType.CONNECT, new ConnectProcessor(mqttConfig));processors.put(MqttMessageType.PINGREQ, new PingReqProcessor());processors.put(MqttMessageType.SUBSCRIBE, new SubscribeProcessor(mqttConfig));processors.put(MqttMessageType.PUBLISH, new PublishProcessor());processors.put(MqttMessageType.PUBACK, new PubackProcessor());processors.put(MqttMessageType.PUBREC, new PubrecProcessor());processors.put(MqttMessageType.PUBREL, new PubrelProcessor());processors.put(MqttMessageType.PUBCOMP, new PubcompProcessor());processors.put(MqttMessageType.DISCONNECT, new DisconnectProcessor());}/*** Handle msg.** @param ctx the ctx* @param msg the msg*/public void handleMsg(ChannelHandlerContext ctx, MqttMessage msg) {MqttMessageProcessor theProcessor = processors.get(msg.fixedHeader().messageType());if (theProcessor != null) {theProcessor.messageReceived(ctx, msg);} else {log.warn("Unsupported message type. {}", msg.fixedHeader().messageType());}}
}

MqttMessageProcessor.java

@Service
public interface MqttMessageProcessor {/*** Message received.** @param ctx the ctx* @param msg the msg*/void messageReceived(ChannelHandlerContext ctx, MqttMessage msg);
}

下面都是实现类

ConnectProcessor.java

@Slf4j
public class ConnectProcessor implements MqttMessageProcessor {private static final int KEEPALIVE_TIME_FACTOR = 2;private static final int REMAINING_LENGTH = 2;private final MqttServerConfig mqttServerConfig;/*** Instantiates a new Connect processor.** @param mqttServerConfig the mqtt server config*/public ConnectProcessor(MqttServerConfig mqttServerConfig) {this.mqttServerConfig = mqttServerConfig;}/*** Create conn ack message mqtt conn ack message.** @param returnCode the return code* @return the mqtt conn ack message*/public static MqttConnAckMessage createConnAckMessage(MqttConnectReturnCode returnCode) {MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, AT_MOST_ONCE, false,REMAINING_LENGTH);MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false);MqttConnAckMessage mqttConnAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader);return mqttConnAckMessage;}/*** 收到CONECT(连接消息),响应CONNACK(连接确认消息)** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttConnectMessage mqttConnectMessage;String clientId = null;try {mqttConnectMessage = (MqttConnectMessage) msg;// 获取连接者的ClientIdclientId = extractClientId(mqttConnectMessage);// 校验客户端标识是否存在if (!checkClientIdentifier(ctx, clientId)) {return;}// 查看设备是否已经创建连接if (isConnected(ctx, clientId)) {return;}// 查询入网设备信息Device device = mqttServerConfig.getDeviceDomain().getClientDevice(clientId);// 白名单校验【能查询到,说明设备已入网】if (!checkWhiteList(ctx, clientId, device)) {return;}// 用户名和密码校验if (!checkUserNameAndPass(ctx, mqttConnectMessage, device, clientId)) {return;}// 走到这儿,前面的认证都通过。==================================================================================// 添加超时处理handleint keepAliveTimeSeconds = mqttConnectMessage.variableHeader().keepAliveTimeSeconds();int newKeepaliveTime = keepAliveTimeSeconds < 60? mqttServerConfig.getTcpTimeOut(): mqttConnectMessage.variableHeader().keepAliveTimeSeconds() * KEEPALIVE_TIME_FACTOR;ctx.channel().pipeline().addFirst(new IdleStateHandler(newKeepaliveTime, -1, -1));// 连接成功,进行响应MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);NettyUtils.write(ctx, connAckMessage);// 连接成功,保存客户端信息// 为通道这是唯一标识属性ctx.channel().attr(MqttConstants.CLIENT_ID_KEY).set(clientId);// 缓存客户端MqttConstants.CLIENTID_CLIENT_MAP.put(clientId, ctx);// 保存此客户端的频率限制器信息PublishRateLimitHandler.CLIENT_PUBLISH_MSG_RATE_DATA_MAP.put(clientId, new ClientPublishRateData());// 记录连接日志InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();log.info("[[[### Connection successful ###]]], clientId: {}, IP: {}, post: {}", clientId,address.getHostString(), address.getPort());} catch (Exception exp) {// 转换失败,对方发送的协议版本不兼容MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] Unsupported protocol version. clientId: {}", clientId, exp);}}private boolean isConnected(ChannelHandlerContext ctx, String clientId) {if (MqttConstants.CLIENTID_CLIENT_MAP.containsKey(clientId)) {// 客户端设备重复连接MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] Client repeatedly connects. clientId: {}", clientId);return true;} else {return false;}}private static boolean checkUserNameAndPass(ChannelHandlerContext ctx, MqttConnectMessage mqttConnectMessage,Device device, String clientId) {try {String userNameNow = mqttConnectMessage.payload().userName();String passwordNow = new String(mqttConnectMessage.payload().passwordInBytes(), CharsetUtil.UTF_8);if (device.getUserName().equals(userNameNow) && device.getHashedPassword().equals(passwordNow)) {return true;}} catch (SecurityException exp) {log.error("encrypt password error", exp);}// 如果用户名密码错误则提示对方MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] Authentication failed, username and password error. clientId: {}", clientId);return false;}private static boolean checkWhiteList(ChannelHandlerContext ctx, String clientId, Device device) {if (device == null) {// 客户端设备未入网MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] The client device is not connected to the network. clientId: {}", clientId);return false;}return true;}private static boolean checkClientIdentifier(ChannelHandlerContext ctx, String clientId) {if (clientId != null) {return true;} else {// 获取客户端唯一标识失败,认证失败MqttConnAckMessage connAckMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID);NettyUtils.write(ctx, connAckMessage);NettyUtils.close(ctx);log.error("[Connection failed] The client identifier is null or blank.");return false;}}private String extractClientId(MqttConnectMessage message) {try {return message.payload().clientIdentifier();} catch (Exception exp) {log.error("Failed to get clientId", exp);}return null;}}

DisconnectProcessor.java

@Slf4j
public class DisconnectProcessor implements MqttMessageProcessor {/*** 收到DISCONNECT(断开连接),不用响应** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {log.warn("Received a disconnect message, disconnect the client. clientId: {}", MqttUtils.getClientId(ctx));// 断开连接,会触发channelInactive方法执行,此方法里进行了会话清理工作ctx.disconnect();}
}

PingReqProcessor.java

@Slf4j
public class PingReqProcessor implements MqttMessageProcessor {private static final MqttMessage PINGRESP = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, AT_MOST_ONCE, false, 0));/*** 收到PINGREQ,响应PINGRESP** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {log.debug("Heartbeat Recovery: clientId: {}, msg: {}", MqttUtils.getClientId(ctx), PINGRESP);NettyUtils.write(ctx, PINGRESP);}
}

PubackProcessor.java

public class PubackProcessor implements MqttMessageProcessor {/*** 收到PUBACK(收到发布消息确认)* qos等级为1的情况,直接删除原始消息,取消publish消息重发定时任务** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();int messageId = variableHeader.messageId();String msgKey = MqttUtils.getRepeatMsgKey(MqttUtils.getClientId(ctx), messageId);MqttUtils.stopResendMsgThreadTask(msgKey);MqttUtils.releaseResendMsgByteBuf(msgKey);}
}

PubcompProcessor.java

public class PubcompProcessor implements MqttMessageProcessor {/*** 收到PUBCOMP(完成发布消息)* 等级为2的情况,接收到最后一次确认,取消上次PUBREL的消息重发机制** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) msg.variableHeader();int messageId = variableHeader.messageId();String msgKey = MqttUtils.getRepeatMsgKey(MqttUtils.getClientId(ctx), messageId);MqttUtils.stopResendMsgThreadTask(msgKey);MqttUtils.releaseResendMsgByteBuf(msgKey);}
}

PublishProcessor.java

@Slf4j
public class PublishProcessor implements MqttMessageProcessor {/*** 收到PUBLISH(),* qos=0,无响应* qos=1, 响应PUBACK(收到发布消息确认)。缓存publish消息,需要重发* qos=2, 响应PUBREC(收到发布消息)。 缓存publish消息,需要重发** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {// 1 从固定头获取qosMqttPublishMessage mqttPublishMessage = (MqttPublishMessage) msg;MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();MqttQoS qos = mqttFixedHeaderInfo.qosLevel();// 2 获取topic、消息体MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();String topicName = variableHeader.topicName();ByteBuf byteBuf = mqttPublishMessage.payload();// 3 publish 消息转发给订阅的客户端forwardPublishMsg(mqttPublishMessage);// 4 如果消息需要保留,缓存消息给后订阅的客户端retainMsg(mqttPublishMessage, mqttFixedHeaderInfo, topicName, byteBuf);// 5 返回消息给发送端ackPublish(ctx, mqttPublishMessage, qos);}private static void ackPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttPublishMessage, MqttQoS qos) {switch (qos) {// 至多一次+case AT_MOST_ONCE:break;// 至少一次case AT_LEAST_ONCE:// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 2);// 构建返回报文, 可变报头MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());// 构建PUBACK消息体MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);NettyUtils.write(ctx, pubAck);break;// 刚好一次case EXACTLY_ONCE:// 构建返回报文,固定报头MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(PUBREC, false, AT_MOST_ONCE, false, 2);// 构建返回报文,可变报头MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader= new MqttPubReplyMessageVariableHeader(mqttPublishMessage.variableHeader().packetId(),MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES);// 构建PUBREC消息体MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttPubReplyMessageVariableHeader);NettyUtils.write(ctx, mqttMessageBack);break;default:break;}}private static void forwardPublishMsg(MqttPublishMessage mqttPublishMessage) {String topicName = mqttPublishMessage.variableHeader().topicName();CopyOnWriteArraySet<String> clientIdSet = MqttConstants.TOPIC_CLIENTSET_MAP.get(topicName);if (!CollectionUtils.isEmpty(clientIdSet)) {// 判断主题是否是共享主题,共享主题默认只给第一个客户端推送if (MqttUtils.isShareTopic(topicName)) {forwardPublishToClient(mqttPublishMessage, topicName, clientIdSet.stream().findFirst().get());return;}// 非共享主题,每个客户端都推送for (String clientId : clientIdSet) {forwardPublishToClient(mqttPublishMessage, topicName, clientId);}}}private static void forwardPublishToClient(MqttPublishMessage mqttPublishMessage, String topicName,String clientId) {ChannelHandlerContext clientCtx = MqttConstants.CLIENTID_CLIENT_MAP.get(clientId);if (clientCtx != null && clientCtx.channel().isActive()) {  // 3.1 topic订阅的客户端连接正常。准备推送消息// 3.1.1当前消息的qos级别小于客户订阅此topic的qos级别时,向订阅者推送消息MqttQoS qos = mqttPublishMessage.fixedHeader().qosLevel();MqttQoS cacheQos = MqttConstants.TOPIC_CLIENTID_AND_QOS_MAP.get(topicName + "_" + clientId);if (cacheQos != null && qos.value() <= cacheQos.value()) {// retainedDuplicate()增加引用计数器,不至于后续操作byteBuf出现错误,引用计数器为0的情况,这里会清除retainedDuplicate的操作有:// SimpleChannelInboundHandler处理器、编码器 MqttEncoder、和最后确认的时候释放,所以每次操作消息之前,先进行一次retainedDuplicate// 3.1.2 引用计数器增加ByteBuf payload = mqttPublishMessage.payload();payload.retainedDuplicate();// 3.1.3 把消息转发给订阅者// 多个消息生产者给同一topic发送的消息msgId一样,因为msgId一样,会导致线程混乱,所以每次缓存的消息都要新生成msgIdint newMessageId = MqttUtils.getNewMessageId();String msgKey = MqttUtils.getRepeatMsgKey(clientId, newMessageId);MqttPublishVariableHeader oldVariableHeader = mqttPublishMessage.variableHeader();MqttPublishVariableHeader newVariableHeader = new MqttPublishVariableHeader(oldVariableHeader.topicName(), newMessageId, oldVariableHeader.properties());MqttPublishMessage pushPublishMsg = new MqttPublishMessage(mqttPublishMessage.fixedHeader(),newVariableHeader, payload);log.info("pushPublishMsgToClient. topic: {}, clientId: {}, oldMsgId: {}, newMsgId: {}",oldVariableHeader.topicName(), clientId, oldVariableHeader.packetId(), newMessageId);NettyUtils.write(clientCtx, pushPublishMsg);// 3.1.4 当qos=1/2时,先缓存消息,如果没有响应,需要重发if (qos == AT_LEAST_ONCE || qos == EXACTLY_ONCE) {MqttUtils.cachePublishMsg(msgKey, payload, pushPublishMsg, clientCtx);}}} else { // 3.2 订阅者客户端异常, 清理此客户端的会话信息if (clientCtx != null) {log.error("Forwarding message client connection error. topic: {} , clientId: {}", topicName, clientId);MqttUtils.delSessionCache(clientId);}}}private static void retainMsg(MqttPublishMessage mqttPublishMessage, MqttFixedHeader mqttFixedHeaderInfo,String topicName, ByteBuf byteBuf) {boolean retain = mqttFixedHeaderInfo.isRetain();if (retain) {if (byteBuf.readableBytes() > 0) {byteBuf.retainedDuplicate();MqttConstants.CACHE_RETAINED_MESSAGES_TOPIC_MSG_MAP.put(topicName, mqttPublishMessage);} else {MqttPublishMessage message = MqttConstants.CACHE_RETAINED_MESSAGES_TOPIC_MSG_MAP.get(topicName);if (message != null) {MqttConstants.CACHE_RETAINED_MESSAGES_TOPIC_MSG_MAP.remove(topicName);// 这里需要手动删除ByteBuf缓存,因为一直会有一份缓存在内存中备用boolean release = ReferenceCountUtil.release(message);log.info("Delete cache message results: {}", release);}}}}}

PubrecProcessor.java

public class PubrecProcessor implements MqttMessageProcessor {/*** 收到PUBREC(收到发布消息),响应PUBREL(释放发布消息)* 等级为2的情况,收到PUBREC报文消息,先停止消息重发机制,再响应一个PUBREL报文并且构建消息重发机制** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) msg.variableHeader();int messageId = variableHeader.messageId();// 1 删除初始publis消息重发定时任务String msgKey = MqttUtils.getRepeatMsgKey(MqttUtils.getClientId(ctx), messageId);MqttUtils.stopResendMsgThreadTask(msgKey);MqttUtils.releaseResendMsgByteBuf(msgKey);// 2 响应PUBREL消息// 构建返回报文,固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false,0);// 构建返回报文,可变报头MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(messageId, MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader);NettyUtils.write(ctx, mqttMessageBack);// 3 缓存Pubrel消息 , 重发机制要放在最下方,否则,一旦出错,会多次出发此机制MqttUtils.cachePubrelMsg(messageId, ctx);}}

PubrelProcessor.java

public class PubrelProcessor implements MqttMessageProcessor {/*** 收到PUBREL(释放发布消息),应该响应 PUBCOMP(完成发布消息)** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) msg.variableHeader();// 构建返回报文, 固定报头MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false,2);// 构建返回报文, 可变报头MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(messageIdVariableHeader.messageId(), MqttPubReplyMessageVariableHeader.REASON_CODE_OK,MqttProperties.NO_PROPERTIES);MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader);NettyUtils.write(ctx, mqttMessageBack);}}

SubscribeProcessor.java

@Slf4j
public class SubscribeProcessor implements MqttMessageProcessor {private final MqttServerConfig mqttServerConfig;/*** 收到SUBSCRIBE(订阅请求),响应SUBACK(订阅确认)** @param mqttConfig the mqtt config*/public SubscribeProcessor(MqttServerConfig mqttConfig) {this.mqttServerConfig = mqttConfig;}private static MqttSubAckMessage getMqttSubAckMessage(MqttSubscribeMessage mqttSubscribeMessage,List<Integer> grantedQoSLevels) {MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, AT_MOST_ONCE, false,0);// 可变头MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(mqttSubscribeMessage.variableHeader().messageId());// 构建返回报文 有效负载MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);// 构建返回报文 订阅确认MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack);return subAck;}/*** Message received.** @param ctx the ctx* @param msg the msg*/@Overridepublic void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) {MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) msg;try {// 1 获取订阅的topic列表MqttSubscribePayload subscribePayload = mqttSubscribeMessage.payload();List<MqttTopicSubscription> mqttTopicSubscriptions = subscribePayload.topicSubscriptions();// 2 获取客户端IDString clientId = MqttUtils.getClientId(ctx);// 3 创建对象存放订阅的结果,【按照顺序存放订阅topic的qos值】List<Integer> returnQoSLevels = new ArrayList<Integer>();// 3 获取此客户端订阅的topic列表CopyOnWriteArraySet<String> topicSet = MqttConstants.CLIENTID_TOPICSET_MAP.getOrDefault(clientId,new CopyOnWriteArraySet<>());// 4 缓存topic相关信息cacheTopicInfo(mqttTopicSubscriptions, clientId, returnQoSLevels, topicSet);// 4 应答// 固定报头MqttSubAckMessage subAck = getMqttSubAckMessage(mqttSubscribeMessage, returnQoSLevels);NettyUtils.write(ctx, subAck);// 5 订阅的topic如果有消息,则向客户端推送最新消息pushPublishMsgToSubscribedClient(ctx, clientId, topicSet);} catch (Exception exp) {log.error("Subscription message processing error", exp);}}private void pushPublishMsgToSubscribedClient(ChannelHandlerContext ctx, String clientId, Set<String> topicSet) {if (mqttServerConfig.isImmediatelyReceiveMsg()) {for (String topic : topicSet) {// 查看订阅的主题是否需要需要发送消息pushPublishMsgToClient(ctx, clientId, topic);}}}private void pushPublishMsgToClient(ChannelHandlerContext ctx, String clientId, String topic) {// 1 获取此topic最新的一条消息MqttPublishMessage retainedMsg = MqttConstants.CACHE_RETAINED_MESSAGES_TOPIC_MSG_MAP.get(topic);if (retainedMsg != null) {MqttFixedHeader retainedFixedHeaderInfo = retainedMsg.fixedHeader();MqttQoS retainPublishMsgQos = retainedFixedHeaderInfo.qosLevel();MqttQoS cacheQos = MqttConstants.TOPIC_CLIENTID_AND_QOS_MAP.get(topic + "_" + clientId);// 2 只有发送小于或等于这个服务质量的消息给订阅者if (cacheQos != null && retainPublishMsgQos.value() <= cacheQos.value()) {// 3 引用计数器增加ByteBuf payload = retainedMsg.payload();payload.retainedDuplicate();// 4 多个消息生产者给同一topic发送的消息msgId一样,因为msgId一样,会导致线程混乱,所以每次缓存的消息都要新生成msgIdint newMessageId = MqttUtils.getNewMessageId();// 5 生产成新的publish消息,发送给订阅者MqttPublishVariableHeader oldVariableHeader = retainedMsg.variableHeader();MqttPublishVariableHeader newVariableHeader = new MqttPublishVariableHeader(oldVariableHeader.topicName(), newMessageId, oldVariableHeader.properties());MqttPublishMessage pushPublishMsg = new MqttPublishMessage(retainedMsg.fixedHeader(), newVariableHeader,payload);log.info("pushPublishMsgToClient. topic: {}, clientId: {}, oldMsgId: {}, newMsgId: {}", topic, clientId,oldVariableHeader.packetId(), newMessageId);NettyUtils.write(ctx, pushPublishMsg);// 6 开启消息重发机制if (retainPublishMsgQos == AT_LEAST_ONCE || retainPublishMsgQos == EXACTLY_ONCE) {// 消息重发String msgKey = MqttUtils.getRepeatMsgKey(clientId, newMessageId);MqttUtils.cachePublishMsg(msgKey, payload, pushPublishMsg, ctx);}}}}private static void cacheTopicInfo(List<MqttTopicSubscription> mqttTopicSubscriptions, String clientId,List<Integer> grantedQoSLevels, CopyOnWriteArraySet<String> topicSet) {for (MqttTopicSubscription subscription : mqttTopicSubscriptions) {String topicName = subscription.topicName();// 1 将新订阅的topic添加到此客户端的topic列表里topicSet.add(topicName);MqttConstants.CLIENTID_TOPICSET_MAP.put(clientId, topicSet);// 2 获取订阅此topic的所有客户端,并将新客户端添加到订阅此topic的客户端列表里CopyOnWriteArraySet<String> clientIdSet = MqttConstants.TOPIC_CLIENTSET_MAP.getOrDefault(topicName,new CopyOnWriteArraySet<>());clientIdSet.add(clientId);MqttConstants.TOPIC_CLIENTSET_MAP.put(topicName, clientIdSet);// 3 存放不同topic不同客户端对应的qos级别MqttConstants.TOPIC_CLIENTID_AND_QOS_MAP.put(topicName + "_" + clientId, subscription.qualityOfService());// 4 存储客户端订阅的topic的结果int qosLevel = subscription.qualityOfService().value();grantedQoSLevels.add(qosLevel);}}
}

2.2.5 消息重发

ResendMsgThread.java

/*** The type Resend msg thread pool.*/
public class ResendMsgThread {/*** 定时重复发送消息的线程池*/public static final ScheduledThreadPoolExecutor SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(10);/*** 缓存消息*/public static final ConcurrentHashMap<String, ScheduledFuture<?>> MSGKEY_SCHEDULEDFUTURE_MAP= new ConcurrentHashMap<>();
}

ResendMsgTask.java

/*** The type Resend msg task.*/
@Slf4j
public class ResendMsgTask implements Runnable {private String msgKey;private MqttMessage mqttMessage;private ChannelHandlerContext ctx;/*** Instantiates a new Resend msg task.** @param msgKey the msg key* @param mqttMessage the mqtt message* @param ctx the ctx*/public ResendMsgTask(String msgKey, MqttMessage mqttMessage, ChannelHandlerContext ctx) {this.msgKey = msgKey;this.mqttMessage = mqttMessage;this.ctx = ctx;}@Overridepublic void run() {//注意,整个执行过程中,代码报错,线程就会终止if (ctx != null && ctx.channel().isActive()) {log.info("ResendMsg. msgKey: {}", msgKey);if (mqttMessage instanceof MqttPublishMessage) {//推送的原始消息,每次推送,都需要重新拷贝一份try {MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;ByteBuf byteBuf = mqttPublishMessage.payload();byteBuf.retainedDuplicate();NettyUtils.write(ctx, mqttPublishMessage);} catch (Exception e) {// 客户端异常,移除客户端信息log.error("Resend failed, end Resend. msgKey: {}, msg:{}", msgKey, mqttMessage);MqttUtils.delSessionCache(MqttUtils.getClientId(ctx));}} else {//回复的ack类型消息NettyUtils.write(ctx, mqttMessage);}} else {// 客户端异常,移除客户端信息log.error("Resend failed, end Resend. msgKey: {}, msg:{}", msgKey, mqttMessage);if (ctx != null) {MqttUtils.delSessionCache(MqttUtils.getClientId(ctx));}}}
}
http://www.lqws.cn/news/592057.html

相关文章:

  • php-mqtt/client 发布、订阅
  • 学习threejs,使用自定义GLSL 着色器,生成艺术作品
  • Redis-渐进式遍历
  • Android实现仿iOS风格滚动时间选择器
  • 【机器学习深度学习】理解欠拟合、拟合、过拟合
  • React安装使用教程
  • Linux->进程控制(精讲)
  • 文心一言开源版测评:能力、易用性与价值的全面解析
  • 通过http调用来访问neo4j时报错,curl -X POST 执行指令报错
  • 博途多重背景、参数实例
  • swing音频输入
  • 跨境证券交易系统合规升级白皮书:全链路微秒风控+开源替代,护航7月程序化交易新规落地
  • 7.可视化的docker界面——portainer
  • CloudBase AI ToolKit实战:从0到1开发一个智能医疗网站
  • LLM中的思维链,仅仅在提示词中加上思维链,让模型输出,和加上思维链限制模型输出答案,这两方式模型是不是进行了思考的
  • 鸿蒙Next开发中三方库使用指南之-nutpi-privacy_dialog集成示例
  • 用“做饭”理解数据分析流程(Excel三件套实战)​
  • 网站崩溃的幕后黑手:GPTBot爬虫的流量冲击
  • 论文阅读:Align and Prompt (ALPRO 2021.12)
  • 零开始git使用教程-传html文件
  • 浅谈Docker Kicks in的应用
  • 51单片机制作万年历
  • 观察者模式
  • 新版本 Spring Data Jpa + QueryDSL 使用教程
  • TensorFlow源码深度阅读指南
  • 【科研绘图系列】基于R语言的复杂热图绘制教程:环境因素与染色体效应的可视化
  • C#程序设计简介
  • 9-2 MySQL 分析查询语句:EXPLAIN(详细说明)
  • Milvus docker-compose 部署
  • 从苹果事件看 ARM PC市场的未来走向