当前位置: 首页 > news >正文

实战案例:使用C#实现高效MQTT消息发布系统

1. 引言

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)和实时数据传输场景。在工业自动化、智能家居等领域,我们经常需要将大量设备数据上传至云端或本地服务器。

本文将分享一个基于C#的MQTT消息发布系统,该系统具备以下特点:

  • 异步编程:使用async/await提高吞吐量

  • 多线程优化:并行发送消息,提高效率

  • 并发控制:使用SemaphoreSlim防止服务器过载

  • 错误处理:自动记录日志并邮件通知异常

2. 开发环境与依赖

  • 开发语言:C# (.NET 4.8)

  • MQTT库MQTTnet(通过NuGet安装)

  • 数据库:Oracle 11g(存储待发送数据)

  • 日志系统:自定义日志记录

3. 核心代码实现

3.1 发布和订阅

在MQTT消息系统中,仅依赖PublishAsync的返回结果并不能100%保证消息已被Broker正确处理。我们曾遇到以下问题场景:

  • Broker返回Success,MQTTX订阅相关的主题,但是没有收到我们发布的消息。(发布消息和订阅消息都使用了Qos 1)

可能的原因:

 1)Qos 1的确认范围

  • PUBACK仅表示Broker接收成功,不保证:

    • 消息已持久化到磁盘(若Broker崩溃)

    • 消息已传递给订阅者

    • 订阅者已成功处理消息

2)网络连接不稳定等原因

解决方案:在发布消息后,主动订阅自身发布的消息,通过双重确认机制确保消息可靠投递。

private async Task<string> SendMQData(string testData, string testId, string topic_Data, IMqttClient mqttClient){//防御性编程:发布消息后,自己订阅topic,确保broker有收到消息再更新数据库。//原因:就算用Qos1去发布消息后,即使publishResult.ReasonCode返回Success//      MQTTX有时候仍然无法收到消息(尤其并发量高的时候),如果此时更新数据库,将无法得知是哪笔数据没传输成功。#region 异步方式执行if (!mqttClient.IsConnected){await mqttClient.ReconnectAsync();}// 1. 创建一个“任务完成源”(用于等待异步事件)var receivedSignal = new TaskCompletionSource<bool>();// 2. 订阅主题await mqttClient.SubscribeAsync(topic_Data, MqttQualityOfServiceLevel.AtLeastOnce);mqttClient.ApplicationMessageReceivedAsync += e =>{if (e.ApplicationMessage.Topic == topic_Data){if (Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()) == testData){receivedSignal.TrySetResult(true); // 通知“已收到正确消息”}}return Task.CompletedTask;};// 3. 发布消息var message = new MqttApplicationMessageBuilder().WithTopic(topic_Data).WithPayload(testData).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build();var publishResult = await mqttClient.PublishAsync(message);if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success){UpdateLog("发布失败(Broker拒绝), Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"发布失败(Broker拒绝), Id: {testId}", DateTime.Now, false);return "NG";}// 4. 等待最多 2 秒,看是否能收到消息var timeoutTask = Task.Delay(2000);var completedTask = await Task.WhenAny(receivedSignal.Task, timeoutTask);// 5. 判断结果if (completedTask == receivedSignal.Task && await receivedSignal.Task){updateData(testId);UpdateLog("上传成功, Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);return "OK";}else{UpdateLog("上传失败(未收到确认), Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"上传失败(未收到确认), Id: {testId}", DateTime.Now, true);return "NG";}#endregion}

3.2 MQTT连接配置和异步并发

特点:

1)一次连接,多次发布消息(减少连接开销)

2)使用Task并行发送消息,并通过SemaphoreSlim控制最大并发数。多次测试表明,当并发数大于18时,会显著出现3.1描述的情况(即:PublishAsync的返回结果是Success,但是MQTTX作为订阅者并不能收到消息,发布方和接收方都是用Qos 1 的情况下)

public async Task<string> AutoTaskUploadAsync(DataTable allData,(string Address, string Account, string Pwd, string Port) Config){StringBuilder resId = new StringBuilder();List<Task> tasks = new List<Task>();object lockObj = new object();string broker = Config.Address;int port = int.Parse(Config.Port);string clientId = Guid.NewGuid().ToString();//string topic = topic_Data;string username = Config.Account;string password = Config.Pwd;//多次测试证明12个并发数比较安全(不同服务器性能不一致,这里大家可以按需修改)SemaphoreSlim _semaphore = new SemaphoreSlim(12);var factory = new MqttFactory();var mqttClient = factory.CreateMqttClient();var options = new MqttClientOptionsBuilder().WithTcpServer(broker, port).WithCredentials(username, password).WithClientId(clientId).WithCleanSession().Build();var connectResult = await mqttClient.ConnectAsync(options);if (connectResult.ResultCode == MqttClientConnectResultCode.Success){for (int i = 0; i < allData.Rows.Count; i++){int currentRow = i;tasks.Add(Task.Run(async () =>{//等待信号量许可await _semaphore.WaitAsync().ConfigureAwait(false);try{string res = await SendMQDataTo(Config.Address,Config.Account,Config.Pwd,Config.Port,allData.Rows[currentRow]["test_data"].ToString(),allData.Rows[currentRow]["tpi_id"].ToString(),allData.Rows[currentRow]["topic"].ToString(),mqttClient).ConfigureAwait(false);//lock (lockObj)//{//    resId.Append(allData.Rows[currentRow]["tpi_id"].ToString() + ",");//}}catch (Exception ex){string errorMsg = $"发送数据异常--{ex}";log.SaveMsg("SendDataLog", errorMsg, DateTime.Now, false);common.SendEmail("exampleMES@example.COM",$"MQTT上传失败,ID:{allData.Rows[currentRow][0]}",ex.ToString());}finally{// 释放信号量_semaphore.Release();}}));}}else{string strres = common.SendEmail("exampleMES@example.COM", string.Format(@"连接MQ Broker失败,原因:" + connectResult.ResultCode.ToString()), connectResult.ResultCode.ToString());}// Unsubscribe and disconnectawait Task.WhenAll(tasks);//mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;await mqttClient.DisconnectAsync();// 移除末尾逗号if (resId.Length > 0){resId.Length--;resId.Append(")");}return resId.ToString();}

3.3 记录日志、定时执行和异步显示

日志部分:

特点:操作前显示做到哪一步,方便后续维护时排查问题和分析性能瓶颈。

如本次项目中,本人一直以为多线程的速度有瓶颈,加了日志后,发现是读取数据库表时存在瓶颈。

private async Task SendData(string getDate){string Addres = string.Empty; // MQ服务器地址    string Account = string.Empty; // MQ连接账号  string Pwd = string.Empty;   // MQ连接密码string Port = string.Empty; // 端口StringBuilder result = new StringBuilder();StringBuilder resId = new StringBuilder();string ngMsg = string.Empty;Addres = txtAddress.Text;Account = txtAccount.Text;Pwd = txtPassWord.Text;Port = txtPort.Text;var Config = (Addres, Account, Pwd, Port);if (string.IsNullOrEmpty(Addres)){MessageBox.Show("请输入MQ服务器地址");return;}if (string.IsNullOrEmpty(Account)){MessageBox.Show("请输入MQ账号");return;}if (string.IsNullOrEmpty(Pwd)){MessageBox.Show("请输入MQ密码");return;}if (string.IsNullOrEmpty(Port)){MessageBox.Show("请输入MQ端口号");return;}string strDate = getDate;//获取全部没上传的记录UpdateLog("开始读取T_TABLE信息"+"\r\n");DataTable allData = GetDataToSend();UpdateLog("读取完成,准备上传" + "\r\n");///如果有输入日期,则只获取日期内没上传的记录if (!string.IsNullOrEmpty(strDate)){allData = GetDataToSend(strDate);}if (allData.Rows.Count == 0 || allData is null){UpdateLog(string.Format(@"----------暂无数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------暂无数据,时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);  //插入日志return;}result.Append(string.Format(@"本次上传共{0}条数据,ID(", allData.Rows.Count));UpdateLog(string.Format(@"----------开始上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------开始上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);  //插入日志#region 异步调用(更稳定)string asyncResult = await AutoTaskUploadAsync(allData, Config);result.Append(asyncResult);#endregionUpdateLog(result.ToString() + "\r\n");log.SaveMsg("SendDataLog", result.ToString(), DateTime.Now, true);UpdateLog(string.Format(@"----------完成本次上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------完成本次上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);}

定时执行部分

特点:

利用定时器,根据前端设定的时间间隔,定时执行

private void xcButton3_Click(object sender, EventArgs e){int seconds = int.Parse(this.txt_Seconds.Text.ToString());this.timer1.Interval = seconds * 1000;this.btnStart.Enabled = false;this.btnStop.Enabled = true;点击启动时,立刻运行一次this.BeginInvoke(new Action(() => timer1_Tick(null, EventArgs.Empty)));this.timer1.Start();}private void btnStop_Click(object sender, EventArgs e){this.btnStart.Enabled = true;this.btnStop.Enabled = false;this.timer1.Stop();}private async void timer1_Tick(object sender, EventArgs e){LogClear();UpdateLog("\r\n" + "本次上传时间:" + DateTime.Now.ToString() + "\r\n");int seconds = int.Parse(this.txt_Seconds.Text.ToString());await SendData("");UpdateLog("下次上传时间:" + DateTime.Now.AddSeconds(seconds).ToString() + "\r\n");}

异步显示部分

特点:

1)如果当前线程不在主线程,利用委托,将日志显示在控件上

private void UpdateLog(string message){if (txt_SendData_Log.InvokeRequired){txt_SendData_Log.Invoke((MethodInvoker)delegate{txt_SendData_Log.AppendText(message + "\r\n");});}else{txt_SendData_Log.AppendText(message + "\r\n");}}

2)当txtbox超过100行日志时,清楚控件上的记录,释放内存,防止内存溢出。

private void LogClear(){int loglines = this.txt_SendData_Log.Lines.Length;if (loglines >= 100){this.txt_SendData_Log.Clear();}}

3)将日志写入到txt文档中,做完整的日志记录。

Loger类

public class Loger{private object LockObj = new object();private string ExePath;public Loger(){ExePath = Path.GetDirectoryName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);}private StreamWriter Create(string fullPath){StreamWriter sr;if (File.Exists(fullPath)){sr = File.AppendText(fullPath);}else{sr = File.CreateText(fullPath);}return sr;}public void SaveMsg(string cataLog, string message, DateTime dt, bool isLoger){string fileName = string.Format("{0}.log", dt.ToString("yyyyMMdd"));Save(cataLog, fileName, message, isLoger);}public void Save(string cataLog, string fileName, string msg, bool isLoger){lock (LockObj){if (isLoger){try{string fullPath = Path.Combine(ExePath + "\\" + cataLog, fileName);EnsureDirectory(fullPath);using (StreamWriter sw = Create(fullPath)){sw.WriteLine(string.Format("{0}-->:{1}\r\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg));sw.Close();}}catch { }}}}private void EnsureDirectory(string fullPath){string path = Path.GetDirectoryName(fullPath);if (!Directory.Exists(path))Directory.CreateDirectory(path);}public void Remove(string fileName){lock (LockObj){if (File.Exists(fileName)){File.Delete(fileName);}}}

调用方式


private Loger log = new Loger();log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);

http://www.lqws.cn/news/587647.html

相关文章:

  • w-笔记:uni-app的H5平台和非H5平台的拍照识别功能:
  • Python 库 包 软件开发工具包(SDK) openai
  • AlpineLinux安装docker
  • STM32——DAP下载程序和程序调试
  • 初始化挂载Linux数据盘
  • Android 中 使用 ProgressBar 实现进度显示
  • Intel oneAPI工具集全面解析:从环境配置到流体动力学模拟优化
  • try-catch-finally 如何使用?
  • 《JMS 消息重试机制与死信队列配置指南:以 IBM MQ 与 TongLinkQ 为例》
  • 大模型在多发性硬化预测及治疗方案制定中的应用研究
  • 选择 PDF 转 HTML 转换器的 5 个关键特性
  • MySQL:CRUD操作
  • uniapp小程序蓝牙打印通用版(集成二维码打印)
  • 在vue当中使用动画
  • Oracle 树形统计再进阶:类型多样性与高频类型分析(第三课)
  • Monad:函数式编程中的 “容器模式”
  • 六自由度按摩机器人 MATLAB 仿真
  • Openssl升级
  • SQL规范
  • FastAPI 学习(二)
  • 在Flutter中生成App Bundle并上架Google Play
  • android BottomSheet及AlertDialog的几种material3 常见ui的用法
  • WSL/Linux 常用命令速查
  • Windows 11 安装 Linux 系统详细教程
  • docker安装RabbitMQ,创建RabbitMQ容器以及docker-compose.yml配置
  • 博图SCL编程:结构体(STRUCT)使用详解与实战案例
  • 英国研究团队启动合成完整人类基因组的前沿项目
  • 解决VSCode打开最近项目后终端shell不正常的问题
  • 数据结构入门-图的基本概念与存储结构
  • 数据结构与算法分析课设:一元多项式求值