SpringBoot中RocketMQ的使用教程
一、RocketMQ简介
RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高性能、高可靠、高实时、分布式等特点。它支持多种消息类型,包括普通消息、顺序消息、事务消息和定时/延时消息。
二、SpringBoot集成RocketMQ
1. 添加依赖
首先在SpringBoot项目中添加RocketMQ的依赖:
<!-- RocketMQ Starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
2. 配置RocketMQ
在application.properties或application.yml中添加配置:
# RocketMQ配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
三、消息生产者
1. 简单消息发送
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class SimpleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
}
2. 发送对象消息
public void sendObjectMessage(String topic, User user) {rocketMQTemplate.convertAndSend(topic, user);
}
3. 发送带Tag的消息
public void sendMessageWithTag(String topic, String tag, String message) {rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}
四、消息消费者
1. 简单消息消费
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer-group")
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);}
}
2. 消费对象消息
@Service
@RocketMQMessageListener(topic = "user-topic", consumerGroup = "user-consumer-group")
public class UserConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("Received user: " + user.toString());}
}
3. 消费带Tag的消息
@Service
@RocketMQMessageListener(topic = "tag-topic", selectorExpression = "tagA", consumerGroup = "tag-consumer-group")
public class TagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received tag message: " + message);}
}
五、高级特性
1. 顺序消息
生产者
public void sendOrderMessage(String topic, String message, String orderId) {rocketMQTemplate.syncSendOrderly(topic, message, orderId);
}
消费者:
@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group",consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received order message: " + message);}
}
2. 事务消息
生产者:
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;@Component
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(String topic, String message) {rocketMQTemplate.sendMessageInTransaction("tx-group", topic, MessageBuilder.withPayload(message).build(), null);}@Beanpublic TransactionListener transactionListener() {return new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 业务逻辑return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}};}
}
3. 延时消息
public void sendDelayMessage(String topic, String message, int delayLevel) {Message<String> msg = MessageBuilder.withPayload(message).build();rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
}
延时级别对应关系:
- 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
六、常见问题
1. 消息重复消费
解决方案:
- 实现幂等性处理
- 使用Redis等缓存记录已处理消息ID
2. 消息堆积
解决方案:
- 增加消费者数量
- 优化消费逻辑
- 设置合理的消费线程数
3. 消息丢失
解决方案:
- 使用同步发送方式
- 开启重试机制
- 实现消息补偿机制
七、完整示例
1. 生产者示例
@RestController
@RequestMapping("/mq")
public class MQController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String sendMessage(@RequestParam String message) {rocketMQTemplate.convertAndSend("test-topic", message);return "Message sent: " + message;}@GetMapping("/send-order")public String sendOrderMessage(@RequestParam String message, @RequestParam String orderId) {rocketMQTemplate.syncSendOrderly("order-topic", message, orderId);return "Order message sent: " + message;}
}
2. 消费者示例
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
public class TestConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);// 业务处理逻辑}
}@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group",consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received order message: " + message);// 订单处理逻辑}
}
八、总结
通过本教程,你已经学会了如何在SpringBoot项目中集成和使用RocketMQ,包括:
- 基本消息的发送和接收
- 顺序消息的处理
- 事务消息的实现
- 延时消息的使用