springboot集成mqtt收发消息
在 Spring Boot 中使用 MQTT 可以通过集成 Eclipse Paho 或 HiveMQ 等客户端库实现。以下是完整的整合步骤,包括配置、发布和订阅消息的示例。
1. 添加 MQTT 依赖
在 pom.xml 中添加 Paho MQTT 客户端依赖:
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
2. 配置 MQTT 连接
2.1 配置文件(application.yml)
mqtt:broker-url: tcp://127.0.0.1:8084client-id: service_001username: 用户名password: 密码default-topic: 话题
运行
2.2 MQTT 配置类
package io.renren.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{brokerUrl});options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true); // 是否持久化会话factory.setConnectionOptions(options);return factory;}
}
3. 发布消息(Publisher)
3.1 创建 MQTT 消息网关
package io.renren.config;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
}
3.2 配置出站通道
package io.renren.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;@Configuration
public class MqttOutboundConfig {@Value("${mqtt.default-topic}")private String defaultTopic;@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler handler = new MqttPahoMessageHandler("publisher-client", factory);handler.setAsync(true); // 异步发送handler.setDefaultTopic(defaultTopic); // 默认Topicreturn handler;}
}
3.3 使用示例(Controller 发布消息)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MqttController {@Autowiredprivate MqttGateway mqttGateway;@PostMapping("/publish")public String publish(@RequestParam String message, @RequestParam(required = false) String topic) {mqttGateway.sendToMqtt(message, topic != null ? topic : "test/topic");return "Message published: " + message;}
}
4. 订阅消息(Subscriber)
4.1 配置入站通道
package io.renren.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.messaging.MessageChannel;import io.renren.modules.app.service.BmsDeviceInfoService;@Configuration
public class MqttInboundConfig {@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.default-topic}")private String defaultTopic;@Autowiredprivate BmsDeviceInfoService bmsDeviceInfoService;@Beanpublic MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory factory) {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, factory, defaultTopic);adapter.setErrorChannel(errorChannel()); // 配置错误通道return adapter;}@Beanpublic MessageChannel errorChannel() {return new PublishSubscribeChannel();}@Beanpublic IntegrationFlow mqttInboundFlow(MqttPahoMessageDrivenChannelAdapter adapter) {return IntegrationFlows.from(adapter).handle(message -> {String payload = message.getPayload().toString();String topic = message.getHeaders().get("mqtt_receivedTopic").toString();System.out.println("Received: Topic=" + topic + ", Payload=" + payload);bmsDeviceInfoService.handleMqtt(payload);}).get();}
}
5. 测试 MQTT 功能
5.1 测试发布消息
curl -X POST "http://localhost:8080/publish?message=HelloMQTT&topic=test/topic"
5.2 查看订阅日志
控制台会输出接收到的消息:
Received: Topic=test/topic, Payload=HelloMQTT