当前位置: 首页 > news >正文

SpringBoot中RocketMQ的使用教程

一、RocketMQ简介

RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高性能、高可靠、高实时、分布式等特点。它支持多种消息类型,包括普通消息、顺序消息、事务消息和定时/延时消息。

二、SpringBoot集成RocketMQ

1. 添加依赖

首先在SpringBoot项目中添加RocketMQ的依赖:

<!-- RocketMQ Starter -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>

2. 配置RocketMQ

在application.properties或application.yml中添加配置:

# RocketMQ配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

三、消息生产者

1. 简单消息发送

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class SimpleProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
}

2. 发送对象消息

public void sendObjectMessage(String topic, User user) {rocketMQTemplate.convertAndSend(topic, user);
}

3. 发送带Tag的消息

public void sendMessageWithTag(String topic, String tag, String message) {rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}

四、消息消费者

1. 简单消息消费

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer-group")
public class SimpleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);}
}

2. 消费对象消息

@Service
@RocketMQMessageListener(topic = "user-topic", consumerGroup = "user-consumer-group")
public class UserConsumer implements RocketMQListener<User> {@Overridepublic void onMessage(User user) {System.out.println("Received user: " + user.toString());}
}

3. 消费带Tag的消息

@Service
@RocketMQMessageListener(topic = "tag-topic", selectorExpression = "tagA", consumerGroup = "tag-consumer-group")
public class TagConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received tag message: " + message);}
}

五、高级特性

1. 顺序消息

生产者

public void sendOrderMessage(String topic, String message, String orderId) {rocketMQTemplate.syncSendOrderly(topic, message, orderId);
}

消费者:

@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group",consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received order message: " + message);}
}

2. 事务消息

生产者:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;@Component
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendTransactionMessage(String topic, String message) {rocketMQTemplate.sendMessageInTransaction("tx-group", topic, MessageBuilder.withPayload(message).build(), null);}@Beanpublic TransactionListener transactionListener() {return new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 业务逻辑return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}};}
}

3. 延时消息

public void sendDelayMessage(String topic, String message, int delayLevel) {Message<String> msg = MessageBuilder.withPayload(message).build();rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
}

延时级别对应关系:

  • 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

六、常见问题

1. 消息重复消费

解决方案:

  • 实现幂等性处理
  • 使用Redis等缓存记录已处理消息ID

2. 消息堆积

解决方案:

  • 增加消费者数量
  • 优化消费逻辑
  • 设置合理的消费线程数

3. 消息丢失

解决方案:

  • 使用同步发送方式
  • 开启重试机制
  • 实现消息补偿机制

七、完整示例

1. 生产者示例

@RestController
@RequestMapping("/mq")
public class MQController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send")public String sendMessage(@RequestParam String message) {rocketMQTemplate.convertAndSend("test-topic", message);return "Message sent: " + message;}@GetMapping("/send-order")public String sendOrderMessage(@RequestParam String message, @RequestParam String orderId) {rocketMQTemplate.syncSendOrderly("order-topic", message, orderId);return "Order message sent: " + message;}
}

2. 消费者示例

@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group")
public class TestConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message: " + message);// 业务处理逻辑}
}@Service
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "order-consumer-group",consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received order message: " + message);// 订单处理逻辑}
}

八、总结

通过本教程,你已经学会了如何在SpringBoot项目中集成和使用RocketMQ,包括:

  1. 基本消息的发送和接收
  2. 顺序消息的处理
  3. 事务消息的实现
  4. 延时消息的使用

http://www.lqws.cn/news/604585.html

相关文章:

  • 记一次finallshell.exe打开无法应的处理
  • CKS-CN考试之路----13
  • 多项式带余除法——线性代数题目为例
  • react调用打印机自定义样式
  • mysql语句练习
  • [CS创世SD NAND征文] 精准控制的坚固基石:CS创世SD NAND在华大HC32F4A0运动控制卡中的高可靠应用
  • React 学习(2)
  • Linux下MinIO分布式安装部署
  • 大语言模型随意猜测网址引发网络安全危机
  • 深入理解装饰器模式:动态扩展对象功能的灵活设计模式
  • 软考高项一次过,个人经验总结
  • Docker:容器化技术的基石与实践指南
  • 【字节跳动】数据挖掘面试题0003:有一个文件,每一行是一个数字,如何用 MapReduce 进行排序和求每个用户每个页面停留时间
  • MinHook 如何对 .NET 母体 CoreCLR 进行拦截
  • 【Unity】MiniGame编辑器小游戏(九)打砖块【Breakout】
  • 深入解析外观模式(Facade Pattern):简化复杂系统的优雅设计
  • Cursor推出全平台AI编程代理,Ultra订阅200美元/月,支持跨设备多任务
  • 123页满分PPT | 华为流程体系建设与运营华为数字化转型流程解决方案及建设案例
  • mars3d (基于 Cesium 的轻量化三维地图库)
  • 老版本 dubbo 泛化调用
  • MiniMind(2)模型架构
  • Java学习第五部分——API部分
  • docker离线/在线环境下安装elasticsearch
  • 多云密钥统一管理实战:CKMS对接阿里云/华为云密钥服务
  • Gin 框架中的优雅退出
  • 智慧赋能高压并网:分布式光伏监控系统在5.88MW物流园项目的实践解析
  • gin如何返回html
  • php安装完毕后没有php-fpm服务
  • 跨平台开发的抉择:Flutter vs 原生安卓(Kotlin)的优劣对比与选型建议​​
  • 【第三章:神经网络原理详解与Pytorch入门】01.神经网络算法理论详解与实践-(1)神经网络预备知识(线性代数、微积分、概率等)