rocketmq 之 阿里云转本地部署实践总结
前言
本地部署参考 这里
MQ部分
代码举例
阿里云
依赖
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.4.Final</version>
</dependency>
阿里云配置类(自定义的)
public class RmqTopicVO {/*** 主题类型**/private String type;/*** 主题**/private String topic;private String tag;/*** 是否延时发送**/private Boolean delayed;/*** 延时小时数**/private List<Long> hours;/*** 延时分钟数**/private List<Long> minutes;/*** 延时秒数**/private List<Long> seconds;/*** 延时毫秒数**/private List<Long> millis;/*** 重试次数**/private Integer retryTimes;
}
生产者代码
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {ProducerBean producer = new ProducerBean();producer.setProperties(getMqPropertie());return producer;
}
消费者代码
public class RmqServerConfig {private String accessKey;private String secretKey;private String nameSrvAddr;private String groupId;private int threadNums = 20;private List<RmqTopicVO> topics;@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean buildConsumer() {ConsumerBean consumerBean = new ConsumerBean();//配置文件Properties properties = getMqPropertie();properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);//将消费者线程数固定为20个 20为默认值properties.setProperty(PropertyKeyConst.ConsumeThreadNums, threadNums+"");consumerBean.setProperties(properties);//订阅关系if (topics == null || topics.isEmpty()){log.error("没有配置RMQ订阅主题");return null;}Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(topics.size());//订阅多个设置topictopics.forEach(topic ->{Subscription subscription = new Subscription();subscription.setTopic(topic.getTopic());subscription.setExpression(topic.getTag());//这里将注入到spring的消息监听处理类获取到,并放到topic订阅列表中subscriptionTable.put(subscription, SpringContextUtil.getBean(topic.getType()));log.info("注册RMQ:{}", topic.getType());});consumerBean.setSubscriptionTable(subscriptionTable);return consumerBean;}public Properties getMqPropertie() {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);return properties;}
}
消息监听类(举例)
@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListener {private final MyService myService;@Overridepublic Action consume(Message message, ConsumeContext consumeContext) {try {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("收到通知, body:{}", body);myService.test(body);return Action.CommitMessage;} catch (Exception e) {log.error("mq发生异常", e);return Action.ReconsumeLater;}}
}
本地部署版
依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.8</version>
</dependency>
生产者代码
@Value("${rocketmq.name-server}")private String myNameSrvAddr;@Value("${rocketmq.producer.access-key}")private String myAccessKey;@Value("${rocketmq.producer.secret-key}")private String mySecretKey;@Value("${spring.application.name}")private String projectName;@Bean( initMethod = "start", destroyMethod = "shutdown")public DefaultMQProducer mqProducer() {// 1. 创建带认证信息的 RPCHookAclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));DefaultMQProducer producer = new DefaultMQProducer(null, aclHook, true, null);// 本地部署客户端要求每个producer有唯一的producerGroupString suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 5);producer.setProducerGroup(projectName + "_" + suffix);producer.setNamesrvAddr(myNameSrvAddr);producer.setVipChannelEnabled(false);return producer;}
消费者代码
@Value("${rocketmq.name-server}")private String myNameSrvAddr;@Value("${rocketmq.consumer.access-key}")private String myAccessKey;@Value("${rocketmq.consumer.secret-key}")private String mySecretKey;private List<DefaultMQPushConsumer> consumers;@Value("${spring.application.name}")private String projectName;@Beanpublic List<DefaultMQPushConsumer> buildConsumer() {List<DefaultMQPushConsumer> consumers = new ArrayList<>(topics.size());for (RmqTopicVO topicVO : topics) {String topic = topicVO.getTopic();if (!StringUtils.hasText(topic)){continue;}MessageListenerConcurrently topicListenerBean = SpringContextUtil.getBean(topicVO.getType());if (Objects.isNull(topicListenerBean)){log.error("topic:[ {} ] no listener object", topic);continue;}String tag = StringUtils.hasText(topicVO.getTag()) ? topicVO.getTag() : "*";try {AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));String groupName = String.join("_", topic, projectName, "group");DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, aclHook, new AllocateMessageQueueAveragely(), true, null);consumer.setNamesrvAddr(myNameSrvAddr);consumer.subscribe(topic, tag);consumer.setConsumeMessageBatchMaxSize(1);consumer.setConsumeThreadMin(threadNums);consumer.setConsumeThreadMax(threadNums);//最大重试次数consumer.setMaxReconsumeTimes(3);consumer.registerMessageListener(topicListenerBean);consumer.start();consumers.add(consumer);}catch (Exception e){log.error("创建topic:[ {} ]消费者异常",topic, e);}}// 保存引用this.consumers = consumers;return consumers;}@PreDestroypublic void destroy() {log.info("进入销毁消费者组流程");if (consumers != null) {for (DefaultMQPushConsumer consumer : consumers) {try {consumer.shutdown();} catch (Exception e) {// 日志记录log.info("关闭消费者组异常", e);}}}}
消息监听类(举例)
@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListenerConcurrently {private final MyService myService;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : list) {boolean success = onMessage(msg);if (!success) {// 只要有一条消息处理失败,整个批次返回RECONSUME_LATER// 这样这一批消息会被整体重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 所有消息都成功处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private boolean onMessage(MessageExt message) {try {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("收到通知,body:{}", body);myService.test(body);} catch (Exception e) {log.error("mq发生异常, topic: {}, msgKey: {}", message.getTopic(), message.getKeys(), e);// 抛出异常,RocketMQ会自动重试return false;}return true;}
}
问题点
- 生产者和消费者组一定不能共用,要不然会出现各种问题,如果你没出现问题,那可能是你测试的不够多
- 关于服务消费同一消息时,要设置不同组,MQ默认是集群消费,不是广播消费。
- 消费时,为了避免批量消费中有一个错误导致所有回滚,可以设置每批次拉取一条消息
- 消息延迟问题,rocektmq 4系列,只有延迟级别,不能设置具体的时间(5系列支持)
RocketMQ-MQTT部分
由于系统需要知道设备上下线,处理业务,所以基于1.0.1版本修改了代码
修改部分
1.心跳检测日志输出到指定文件(这个自己看着改就好,应该很简单)
2.给某个topic发送设备上下线消息
3.记录设备连接日志,方便运维
给某个topic发送设备上下线消息
增加参数
在service.conf
配置文件中添加设备上下线通知topic
connectionNotifyTopic=
,默认iot (具体看下面相关代码)
消息类:
public class ClientStatusEventVO implements Serializable {public static final String CONNECT_TAG = "connect";public static final String DISCONNECT_TAG = "disconnect";public static final String TCP_CLEAN_TAG = "tcpclean";/*** 每个TCP连接的唯一标识*/private String channelId;/*** 具体设备*/private String clientId;/*** 事件类型:connect/disconnect/tcpclean*/private String eventType;/*** 时间*/private Long time;/*** 客户端使用的公网出口IP地址*/private String clientIp;public void setChannelId(String channelId) {this.channelId = channelId;}public void setClientId(String clientId) {this.clientId = clientId;}public void setEventType(String eventType) {this.eventType = eventType;}public void setTime(Long time) {this.time = time;}public void setClientIp(String clientIp) {this.clientIp = clientIp;}public String getChannelId() {return channelId;}public String getClientId() {return clientId;}public String getEventType() {return eventType;}public Long getTime() {return time;}public String getClientIp() {return clientIp;}@Overridepublic String toString() {return "ClientStatusEventVO{" +"channelId='" + channelId + '\'' +", clientId='" + clientId + '\'' +", eventType='" + eventType + '\'' +", time=" + time +", clientIp='" + clientIp + '\'' +'}';}
}
发送消息类
在mqtt-ds中创建
@Component
public class NotifyConnectionManager {private static final Logger logger = LoggerFactory.getLogger(NotifyConnectionManager.class);private static final String PRODUCER_GROUP_NAME = "iot_producer_group";@Resourceprivate ServiceConf serviceConf;private DefaultMQProducer defaultMQProducer;@PostConstructpublic void init() throws MQClientException {String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8);String groupName = PRODUCER_GROUP_NAME + "_" + suffix;defaultMQProducer = MqFactory.buildDefaultMQProducer(groupName, serviceConf.getProperties());defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);defaultMQProducer.start();logger.info("NotifyConnectionManager initialize successfully. groupName: {}", groupName);}public void putMessageToMq(String tag, String msgStr){String notifyTopic = serviceConf.getProperties().getProperty("connectionNotifyTopic", "iot");String msgKey = UUID.randomUUID().toString().replace("-", "");Message rmqMsg = new Message(notifyTopic,tag,msgKey,// 消息体msgStr.getBytes(StandardCharsets.UTF_8));try {defaultMQProducer.send(rmqMsg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {logger.info("Send notify msg successfully, topic:{}, tag:{}, msgKey:{}, msg:{}, result:{}", notifyTopic, tag, msgKey, msgStr, sendResult);}@Overridepublic void onException(Throwable throwable) {logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, throwable);}});}catch (Exception e){logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, e);}}
}
上面三个类中,分别表示连接异常,已连接,连接断开。
ConnectHandler:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// create link successfully , send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.TCP_CLEAN_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.TCP_CLEAN_TAG, jsonString);logger.info(" [ConnectHandler] [tcpclean]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {logger.error("exceptionCaught {}", ctx.channel(), cause);}channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
}
MqttConnectHandler:
public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();final MqttConnectPayload payload = connectMessage.payload();Channel channel = ctx.channel();ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());String remark = upstreamHookResult.getRemark();if (!upstreamHookResult.isSuccess()) {byte connAckCode = (byte) upstreamHookResult.getSubCode();MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);return;}CompletableFuture<Void> future = new CompletableFuture<>();ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);// use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'scheduler.schedule(() -> {if (!future.isDone()) {future.complete(null);}}, 1, TimeUnit.SECONDS);try {MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);future.thenAccept(aVoid -> {if (!channel.isActive()) {return;}ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);channel.writeAndFlush(mqttConnAckMessage);});sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);// save will messageWillMessage willMessage = null;if (variableHeader.isWillFlag()) {if (payload.willTopic() == null || payload.willMessageInBytes() == null) {logger.error("Will message and will topic can not be empty");channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");return;}willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());sessionLoop.addWillMessage(channel, willMessage);}// create link successfully , send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.CONNECT_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.CONNECT_TAG, jsonString);logger.info(" [MqttConnectHandler] [Connect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);} catch (Exception e) {logger.error("Connect:{}", payload.clientIdentifier(), e);channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");}
}
MqttDisconnectHandler:
public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {// disconnect manually, send message to mqString clientId = ChannelInfo.getClientId(ctx.channel());String channelId = ChannelInfo.getId(ctx.channel());String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();long timeStamp = System.currentTimeMillis();ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();clientStatusEventVO.setChannelId(channelId);clientStatusEventVO.setClientId(clientId);clientStatusEventVO.setClientIp(remoteIp);clientStatusEventVO.setEventType(ClientStatusEventVO.DISCONNECT_TAG);clientStatusEventVO.setTime(timeStamp);String jsonString = JSON.toJSONString(clientStatusEventVO);// send connection details to topic, default is iotnotifyConnectionManager.putMessageToMq(ClientStatusEventVO.DISCONNECT_TAG, jsonString);logger.info(" [MqttDisconnectHandler] [Disconnect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "disconnect");
}
这样,就实现了 阿里云的MQTT的disconnect,connect,tcpclean。
效果展示: