当前位置: 首页 > news >正文

rocketmq 之 阿里云转本地部署实践总结

前言

本地部署参考 这里

MQ部分

代码举例

阿里云

依赖

<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.4.Final</version>
</dependency>

阿里云配置类(自定义的)

public class RmqTopicVO {/*** 主题类型**/private String type;/*** 主题**/private String topic;private String tag;/*** 是否延时发送**/private Boolean delayed;/*** 延时小时数**/private List<Long> hours;/*** 延时分钟数**/private List<Long> minutes;/*** 延时秒数**/private List<Long> seconds;/*** 延时毫秒数**/private List<Long> millis;/*** 重试次数**/private Integer retryTimes;
}

生产者代码

@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(getMqPropertie());return producer;
}

消费者代码

public class RmqServerConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String groupId;private int threadNums = 20;private List<RmqTopicVO> topics;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, threadNums+"");consumerBean.setProperties(properties);//订阅关系if (topics == null || topics.isEmpty()){log.error("没有配置RMQ订阅主题");return null;}Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(topics.size());//订阅多个设置topictopics.forEach(topic ->{Subscription subscription = new Subscription();subscription.setTopic(topic.getTopic());subscription.setExpression(topic.getTag());//这里将注入到spring的消息监听处理类获取到,并放到topic订阅列表中subscriptionTable.put(subscription, SpringContextUtil.getBean(topic.getType()));log.info("注册RMQ:{}", topic.getType());});consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}

消息监听类(举例)

@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListener {private final MyService myService;@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {try {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("收到通知, body:{}", body);myService.test(body);return Action.CommitMessage;} catch (Exception e) {log.error("mq发生异常", e);return Action.ReconsumeLater;}}
}

本地部署版

依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.8</version>
</dependency>

生产者代码

    @Value("${rocketmq.name-server}")private String myNameSrvAddr;@Value("${rocketmq.producer.access-key}")private String myAccessKey;@Value("${rocketmq.producer.secret-key}")private String mySecretKey;@Value("${spring.application.name}")private String projectName;@Bean( initMethod = "start", destroyMethod = "shutdown")public DefaultMQProducer mqProducer() {// 1. 创建带认证信息的 RPCHookAclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));DefaultMQProducer producer = new DefaultMQProducer(null, aclHook, true, null);// 本地部署客户端要求每个producer有唯一的producerGroupString suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 5);producer.setProducerGroup(projectName + "_" + suffix);producer.setNamesrvAddr(myNameSrvAddr);producer.setVipChannelEnabled(false);return producer;}

消费者代码

    @Value("${rocketmq.name-server}")private String myNameSrvAddr;@Value("${rocketmq.consumer.access-key}")private String myAccessKey;@Value("${rocketmq.consumer.secret-key}")private String mySecretKey;private List<DefaultMQPushConsumer> consumers;@Value("${spring.application.name}")private String projectName;@Beanpublic List<DefaultMQPushConsumer> buildConsumer() {List<DefaultMQPushConsumer> consumers = new ArrayList<>(topics.size());for (RmqTopicVO topicVO : topics) {String topic = topicVO.getTopic();if (!StringUtils.hasText(topic)){continue;}MessageListenerConcurrently topicListenerBean = SpringContextUtil.getBean(topicVO.getType());if (Objects.isNull(topicListenerBean)){log.error("topic:[ {} ] no listener object", topic);continue;}String tag = StringUtils.hasText(topicVO.getTag()) ? topicVO.getTag() : "*";try {AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));String groupName = String.join("_", topic, projectName, "group");DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, aclHook, new AllocateMessageQueueAveragely(), true, null);consumer.setNamesrvAddr(myNameSrvAddr);consumer.subscribe(topic, tag);consumer.setConsumeMessageBatchMaxSize(1);consumer.setConsumeThreadMin(threadNums);consumer.setConsumeThreadMax(threadNums);//最大重试次数consumer.setMaxReconsumeTimes(3);consumer.registerMessageListener(topicListenerBean);consumer.start();consumers.add(consumer);}catch (Exception e){log.error("创建topic:[ {} ]消费者异常",topic, e);}}// 保存引用this.consumers = consumers;return consumers;}@PreDestroypublic void destroy() {log.info("进入销毁消费者组流程");if (consumers != null) {for (DefaultMQPushConsumer consumer : consumers) {try {consumer.shutdown();} catch (Exception e) {// 日志记录log.info("关闭消费者组异常", e);}}}}

消息监听类(举例)

@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListenerConcurrently {private final MyService myService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {boolean success = onMessage(msg);if (!success) {// 只要有一条消息处理失败,整个批次返回RECONSUME_LATER// 这样这一批消息会被整体重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 所有消息都成功处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private boolean onMessage(MessageExt message) {try {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("收到通知,body:{}", body);myService.test(body);} catch (Exception e) {log.error("mq发生异常, topic: {}, msgKey: {}", message.getTopic(), message.getKeys(), e);// 抛出异常,RocketMQ会自动重试return false;}return true;}
}

问题点

  1. 生产者和消费者组一定不能共用,要不然会出现各种问题,如果你没出现问题,那可能是你测试的不够多
  2. 关于服务消费同一消息时,要设置不同组,MQ默认是集群消费,不是广播消费。
  3. 消费时,为了避免批量消费中有一个错误导致所有回滚,可以设置每批次拉取一条消息
  4. 消息延迟问题,rocektmq 4系列,只有延迟级别,不能设置具体的时间(5系列支持)

RocketMQ-MQTT部分

由于系统需要知道设备上下线,处理业务,所以基于1.0.1版本修改了代码

修改部分

1.心跳检测日志输出到指定文件(这个自己看着改就好,应该很简单)
2.给某个topic发送设备上下线消息
3.记录设备连接日志,方便运维

给某个topic发送设备上下线消息

增加参数

service.conf配置文件中添加设备上下线通知topic
connectionNotifyTopic= ,默认iot (具体看下面相关代码)

消息类:

public class ClientStatusEventVO implements Serializable {public static final String CONNECT_TAG = "connect";public static final String DISCONNECT_TAG = "disconnect";public static final String TCP_CLEAN_TAG = "tcpclean";/*** 每个TCP连接的唯一标识*/private String channelId;/*** 具体设备*/private String clientId;/*** 事件类型:connect/disconnect/tcpclean*/private String eventType;/*** 时间*/private Long time;/*** 客户端使用的公网出口IP地址*/private String clientIp;public void setChannelId(String channelId) {this.channelId = channelId;}public void setClientId(String clientId) {this.clientId = clientId;}public void setEventType(String eventType) {this.eventType = eventType;}public void setTime(Long time) {this.time = time;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}public String getChannelId() {return channelId;}public String getClientId() {return clientId;}public String getEventType() {return eventType;}public Long getTime() {return time;}public String getClientIp() {return clientIp;}@Overridepublic String toString() {return "ClientStatusEventVO{" +"channelId='" + channelId + '\'' +", clientId='" + clientId + '\'' +", eventType='" + eventType + '\'' +", time=" + time +", clientIp='" + clientIp + '\'' +'}';}
}

发送消息类

在mqtt-ds中创建
在这里插入图片描述

@Component
public class NotifyConnectionManager {private static final Logger logger = LoggerFactory.getLogger(NotifyConnectionManager.class);private static final String PRODUCER_GROUP_NAME = "iot_producer_group";@Resourceprivate ServiceConf serviceConf;private DefaultMQProducer defaultMQProducer;@PostConstructpublic void init() throws MQClientException {String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8);String groupName = PRODUCER_GROUP_NAME + "_" + suffix;defaultMQProducer = MqFactory.buildDefaultMQProducer(groupName, serviceConf.getProperties());defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);defaultMQProducer.start();logger.info("NotifyConnectionManager initialize successfully. groupName: {}", groupName);}public void putMessageToMq(String tag, String msgStr){String notifyTopic = serviceConf.getProperties().getProperty("connectionNotifyTopic", "iot");String msgKey = UUID.randomUUID().toString().replace("-", "");Message rmqMsg = new Message(notifyTopic,tag,msgKey,// 消息体msgStr.getBytes(StandardCharsets.UTF_8));try {defaultMQProducer.send(rmqMsg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {logger.info("Send notify msg successfully, topic:{}, tag:{}, msgKey:{}, msg:{}, result:{}", notifyTopic, tag, msgKey, msgStr, sendResult);}@Overridepublic void onException(Throwable throwable) {logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, throwable);}});}catch (Exception e){logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, e);}}
}

在这里插入图片描述
上面三个类中,分别表示连接异常,已连接,连接断开。

ConnectHandler:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// create link successfully , send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.TCP_CLEAN_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.TCP_CLEAN_TAG, jsonString);logger.info(" [ConnectHandler] [tcpclean]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {logger.error("exceptionCaught {}", ctx.channel(), cause);}channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
}

MqttConnectHandler:

public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();final MqttConnectPayload payload = connectMessage.payload();Channel channel = ctx.channel();ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());String remark = upstreamHookResult.getRemark();if (!upstreamHookResult.isSuccess()) {byte connAckCode = (byte) upstreamHookResult.getSubCode();MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);return;}CompletableFuture<Void> future = new CompletableFuture<>();ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);// use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'scheduler.schedule(() -> {if (!future.isDone()) {future.complete(null);}}, 1, TimeUnit.SECONDS);try {MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);future.thenAccept(aVoid -> {if (!channel.isActive()) {return;}ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);channel.writeAndFlush(mqttConnAckMessage);});sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);// save will messageWillMessage willMessage = null;if (variableHeader.isWillFlag()) {if (payload.willTopic() == null || payload.willMessageInBytes() == null) {logger.error("Will message and will topic can not be empty");channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");return;}willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());sessionLoop.addWillMessage(channel, willMessage);}// create link successfully , send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.CONNECT_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.CONNECT_TAG, jsonString);logger.info(" [MqttConnectHandler] [Connect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);} catch (Exception e) {logger.error("Connect:{}", payload.clientIdentifier(), e);channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");}
}

MqttDisconnectHandler:

public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {// disconnect manually, send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.DISCONNECT_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.DISCONNECT_TAG, jsonString);logger.info(" [MqttDisconnectHandler] [Disconnect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "disconnect");
}

这样,就实现了 阿里云的MQTT的disconnect,connect,tcpclean。

效果展示:
在这里插入图片描述

http://www.lqws.cn/news/590293.html

相关文章:

  • Vue3 中 Excel 导出的性能优化与实战指南
  • 创建和连接Vue应用程序实例
  • 缓存系统-淘汰策略
  • 强化学习系列--dpo损失函数
  • 齿轮的齿厚极限偏差如何确定?一起学习一下
  • C++基础
  • 目前最火的agent方向-A2A快速实战构建(二): AutoGen模型集成指南:从OpenAI到本地部署的全场景LLM解决方案
  • 《Python 架构之美:三大设计模式实战指南》
  • 【FR801xH】富芮坤FR801xH之UART
  • 【javaAI】SpringAI快速入门
  • 【C#】如果有一个数值如 168.0000100,如何去除末尾的无效零,只显示有效的小数位数,让DeepSeek给我们解答
  • 半加器和全加器
  • Disruptor架构哲学
  • 【机器学习2】正则化regularizaiton(降低模型过拟合)
  • 设备管理的11个指标、七大误区、六大特征
  • muduo
  • 数据结构——线性表的链式存储
  • QT笔记---环境和编译出现的问题
  • Golang的代码结构设计原则与实践与模式应用
  • helm安装配置jenkins
  • 百度轮岗:任命新CFO,崔珊珊退居业务二线
  • Redis-7.4.3-Windows-x64下载安装使用
  • 时空数据挖掘五大革新方向详解篇!
  • 我认知的AI宇宙系列第三期
  • 强化学习概述及学习流程
  • 3D词云图
  • 虚拟机配置过程中的知识点
  • shardingsphere5.2.1与SpringBoot3.X的版本冲突问题
  • 华为云Flexus+DeepSeek征文 | ​​华为云ModelArts Studio大模型与企业AI会议纪要场景的对接方案
  • 具身智能环境的构建和工作(具身智能入门四)