当前位置: 首页 > news >正文

springboot集成mqtt收发消息

在 Spring Boot 中使用 MQTT 可以通过集成 Eclipse Paho 或 HiveMQ 等客户端库实现。以下是完整的整合步骤,包括配置、发布和订阅消息的示例。

1. 添加 MQTT 依赖
在 pom.xml 中添加 Paho MQTT 客户端依赖:

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>


2. 配置 MQTT 连接
2.1 配置文件(application.yml)

mqtt:broker-url: tcp://127.0.0.1:8084client-id: service_001username: 用户名password: 密码default-topic: 话题


运行
2.2 MQTT 配置类
 

package io.renren.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true); // 是否持久化会话factory.setConnectionOptions(options);return factory;}
}

3. 发布消息(Publisher)
3.1 创建 MQTT 消息网关

package io.renren.config;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}


3.2 配置出站通道
 

package io.renren.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;@Configuration
public class MqttOutboundConfig {@Value("${mqtt.default-topic}")private String defaultTopic;@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler("publisher-client", factory);handler.setAsync(true); // 异步发送handler.setDefaultTopic(defaultTopic); // 默认Topicreturn handler;}
}

3.3 使用示例(Controller 发布消息)
 

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/publish")public String publish(@RequestParam String message, @RequestParam(required = false) String topic) {mqttGateway.sendToMqtt(message, topic != null ? topic : "test/topic");return "Message published: " + message;}
}

4. 订阅消息(Subscriber)
4.1 配置入站通道
 

package io.renren.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;import io.renren.modules.app.service.BmsDeviceInfoService;@Configuration
public class MqttInboundConfig {@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.default-topic}")private String defaultTopic;@Autowiredprivate BmsDeviceInfoService bmsDeviceInfoService;@Beanpublic MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, defaultTopic);adapter.setErrorChannel(errorChannel()); // 配置错误通道return adapter;}@Beanpublic MessageChannel errorChannel() {return new PublishSubscribeChannel();}@Beanpublic IntegrationFlow mqttInboundFlow(MqttPahoMessageDrivenChannelAdapter adapter) {return IntegrationFlows.from(adapter).handle(message -> {String payload = message.getPayload().toString();String topic = message.getHeaders().get("mqtt_receivedTopic").toString();System.out.println("Received: Topic=" + topic + ", Payload=" + payload);bmsDeviceInfoService.handleMqtt(payload);}).get();}
}

5. 测试 MQTT 功能
5.1 测试发布消息

curl -X POST "http://localhost:8080/publish?message=HelloMQTT&topic=test/topic"


5.2 查看订阅日志
控制台会输出接收到的消息:

Received: Topic=test/topic, Payload=HelloMQTT

http://www.lqws.cn/news/534223.html

相关文章:

  • python + opencv实现简单的文字水印
  • 【LLM论文阅读】
  • 如果你在为理解RDA、PCA 和 PCoA而烦恼,不妨来看看丨TomatoSCI分析日记
  • [Andrej Karpathy] 大型语言模型作为新型操作系统
  • vue3 json 转 实体
  • 2D 基准情况下贝叶斯优化应用的概率推理
  • Ubuntu下布署mediasoup-demo
  • zabbix监控Centos 服务器
  • 波动方程能量守恒证明
  • dockers virbox 安装
  • COZE API上传文件 直接从前端发送就可以,而通过后端发请求给CozeAPI就不行,为什么?
  • Spring Boot 部署与打包方式详解(Jar vs War)
  • Windows命令连接符的安全风险分析与防御策略
  • SQLMesh:数据建模与版本控制的革新者
  • inverse-design-of-grating-coupler-3d(2)
  • 鸿蒙实时音视频流处理框架开发实战——基于HarmonyOS 4.0与分布式软总线的低延时高可靠架构
  • 从提示工程(Prompt Engineering)到上下文工程(Context Engineering)
  • PyTorch 模型镜像下载与安装指南
  • R 语言简介:数据分析与统计的强大工具
  • DSP学习笔记1
  • 声网对话式AI构建商品场景语义理解能力
  • 基于SpringBoot文件管理系统中的分片上传实现
  • 使用API有效率地管理Dynadot域名,出售账户中的域名
  • OpenCV CUDA模块设备层-----逐通道最小值比较函数min()
  • SpringBoot 中 @Transactional 的使用
  • 【LLM安全】MCP(模型上下文协议)及其关键漏洞、技术细节
  • 力扣网C语言编程题:搜索插入位置
  • mac电脑安装vscode的力扣插件报错解决办法
  • element-plus 按钮 展开/隐藏
  • 百面Bert