实战案例:使用C#实现高效MQTT消息发布系统
1. 引言
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)和实时数据传输场景。在工业自动化、智能家居等领域,我们经常需要将大量设备数据上传至云端或本地服务器。
本文将分享一个基于C#的MQTT消息发布系统,该系统具备以下特点:
-
异步编程:使用
async/await
提高吞吐量 -
多线程优化:并行发送消息,提高效率
-
并发控制:使用
SemaphoreSlim
防止服务器过载 -
错误处理:自动记录日志并邮件通知异常
2. 开发环境与依赖
-
开发语言:C# (.NET 4.8)
-
MQTT库:
MQTTnet
(通过NuGet安装) -
数据库:Oracle 11g(存储待发送数据)
-
日志系统:自定义日志记录
3. 核心代码实现
3.1 发布和订阅
在MQTT消息系统中,仅依赖PublishAsync
的返回结果并不能100%保证消息已被Broker正确处理。我们曾遇到以下问题场景:
-
Broker返回
Success
,MQTTX订阅相关的主题,但是没有收到我们发布的消息。(发布消息和订阅消息都使用了Qos 1)
可能的原因:
1)Qos 1的确认范围
-
PUBACK仅表示Broker接收成功,不保证:
-
消息已持久化到磁盘(若Broker崩溃)
-
消息已传递给订阅者
-
订阅者已成功处理消息
-
2)网络连接不稳定等原因
解决方案:在发布消息后,主动订阅自身发布的消息,通过双重确认机制确保消息可靠投递。
private async Task<string> SendMQData(string testData, string testId, string topic_Data, IMqttClient mqttClient){//防御性编程:发布消息后,自己订阅topic,确保broker有收到消息再更新数据库。//原因:就算用Qos1去发布消息后,即使publishResult.ReasonCode返回Success// MQTTX有时候仍然无法收到消息(尤其并发量高的时候),如果此时更新数据库,将无法得知是哪笔数据没传输成功。#region 异步方式执行if (!mqttClient.IsConnected){await mqttClient.ReconnectAsync();}// 1. 创建一个“任务完成源”(用于等待异步事件)var receivedSignal = new TaskCompletionSource<bool>();// 2. 订阅主题await mqttClient.SubscribeAsync(topic_Data, MqttQualityOfServiceLevel.AtLeastOnce);mqttClient.ApplicationMessageReceivedAsync += e =>{if (e.ApplicationMessage.Topic == topic_Data){if (Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()) == testData){receivedSignal.TrySetResult(true); // 通知“已收到正确消息”}}return Task.CompletedTask;};// 3. 发布消息var message = new MqttApplicationMessageBuilder().WithTopic(topic_Data).WithPayload(testData).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build();var publishResult = await mqttClient.PublishAsync(message);if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success){UpdateLog("发布失败(Broker拒绝), Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"发布失败(Broker拒绝), Id: {testId}", DateTime.Now, false);return "NG";}// 4. 等待最多 2 秒,看是否能收到消息var timeoutTask = Task.Delay(2000);var completedTask = await Task.WhenAny(receivedSignal.Task, timeoutTask);// 5. 判断结果if (completedTask == receivedSignal.Task && await receivedSignal.Task){updateData(testId);UpdateLog("上传成功, Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);return "OK";}else{UpdateLog("上传失败(未收到确认), Id:" + testId + "\r\n");log.SaveMsg("SendDataLog", $"上传失败(未收到确认), Id: {testId}", DateTime.Now, true);return "NG";}#endregion}
3.2 MQTT连接配置和异步并发
特点:
1)一次连接,多次发布消息(减少连接开销)
2)使用Task
并行发送消息,并通过SemaphoreSlim
控制最大并发数。多次测试表明,当并发数大于18时,会显著出现3.1描述的情况(即:PublishAsync
的返回结果是Success,但是MQTTX作为订阅者并不能收到消息,发布方和接收方都是用Qos 1 的情况下)
public async Task<string> AutoTaskUploadAsync(DataTable allData,(string Address, string Account, string Pwd, string Port) Config){StringBuilder resId = new StringBuilder();List<Task> tasks = new List<Task>();object lockObj = new object();string broker = Config.Address;int port = int.Parse(Config.Port);string clientId = Guid.NewGuid().ToString();//string topic = topic_Data;string username = Config.Account;string password = Config.Pwd;//多次测试证明12个并发数比较安全(不同服务器性能不一致,这里大家可以按需修改)SemaphoreSlim _semaphore = new SemaphoreSlim(12);var factory = new MqttFactory();var mqttClient = factory.CreateMqttClient();var options = new MqttClientOptionsBuilder().WithTcpServer(broker, port).WithCredentials(username, password).WithClientId(clientId).WithCleanSession().Build();var connectResult = await mqttClient.ConnectAsync(options);if (connectResult.ResultCode == MqttClientConnectResultCode.Success){for (int i = 0; i < allData.Rows.Count; i++){int currentRow = i;tasks.Add(Task.Run(async () =>{//等待信号量许可await _semaphore.WaitAsync().ConfigureAwait(false);try{string res = await SendMQDataTo(Config.Address,Config.Account,Config.Pwd,Config.Port,allData.Rows[currentRow]["test_data"].ToString(),allData.Rows[currentRow]["tpi_id"].ToString(),allData.Rows[currentRow]["topic"].ToString(),mqttClient).ConfigureAwait(false);//lock (lockObj)//{// resId.Append(allData.Rows[currentRow]["tpi_id"].ToString() + ",");//}}catch (Exception ex){string errorMsg = $"发送数据异常--{ex}";log.SaveMsg("SendDataLog", errorMsg, DateTime.Now, false);common.SendEmail("exampleMES@example.COM",$"MQTT上传失败,ID:{allData.Rows[currentRow][0]}",ex.ToString());}finally{// 释放信号量_semaphore.Release();}}));}}else{string strres = common.SendEmail("exampleMES@example.COM", string.Format(@"连接MQ Broker失败,原因:" + connectResult.ResultCode.ToString()), connectResult.ResultCode.ToString());}// Unsubscribe and disconnectawait Task.WhenAll(tasks);//mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;await mqttClient.DisconnectAsync();// 移除末尾逗号if (resId.Length > 0){resId.Length--;resId.Append(")");}return resId.ToString();}
3.3 记录日志、定时执行和异步显示
日志部分:
特点:操作前显示做到哪一步,方便后续维护时排查问题和分析性能瓶颈。
如本次项目中,本人一直以为多线程的速度有瓶颈,加了日志后,发现是读取数据库表时存在瓶颈。
private async Task SendData(string getDate){string Addres = string.Empty; // MQ服务器地址 string Account = string.Empty; // MQ连接账号 string Pwd = string.Empty; // MQ连接密码string Port = string.Empty; // 端口StringBuilder result = new StringBuilder();StringBuilder resId = new StringBuilder();string ngMsg = string.Empty;Addres = txtAddress.Text;Account = txtAccount.Text;Pwd = txtPassWord.Text;Port = txtPort.Text;var Config = (Addres, Account, Pwd, Port);if (string.IsNullOrEmpty(Addres)){MessageBox.Show("请输入MQ服务器地址");return;}if (string.IsNullOrEmpty(Account)){MessageBox.Show("请输入MQ账号");return;}if (string.IsNullOrEmpty(Pwd)){MessageBox.Show("请输入MQ密码");return;}if (string.IsNullOrEmpty(Port)){MessageBox.Show("请输入MQ端口号");return;}string strDate = getDate;//获取全部没上传的记录UpdateLog("开始读取T_TABLE信息"+"\r\n");DataTable allData = GetDataToSend();UpdateLog("读取完成,准备上传" + "\r\n");///如果有输入日期,则只获取日期内没上传的记录if (!string.IsNullOrEmpty(strDate)){allData = GetDataToSend(strDate);}if (allData.Rows.Count == 0 || allData is null){UpdateLog(string.Format(@"----------暂无数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------暂无数据,时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true); //插入日志return;}result.Append(string.Format(@"本次上传共{0}条数据,ID(", allData.Rows.Count));UpdateLog(string.Format(@"----------开始上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------开始上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true); //插入日志#region 异步调用(更稳定)string asyncResult = await AutoTaskUploadAsync(allData, Config);result.Append(asyncResult);#endregionUpdateLog(result.ToString() + "\r\n");log.SaveMsg("SendDataLog", result.ToString(), DateTime.Now, true);UpdateLog(string.Format(@"----------完成本次上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");log.SaveMsg("SendDataLog", string.Format(@"----------完成本次上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);}
定时执行部分
特点:
利用定时器,根据前端设定的时间间隔,定时执行
private void xcButton3_Click(object sender, EventArgs e){int seconds = int.Parse(this.txt_Seconds.Text.ToString());this.timer1.Interval = seconds * 1000;this.btnStart.Enabled = false;this.btnStop.Enabled = true;点击启动时,立刻运行一次this.BeginInvoke(new Action(() => timer1_Tick(null, EventArgs.Empty)));this.timer1.Start();}private void btnStop_Click(object sender, EventArgs e){this.btnStart.Enabled = true;this.btnStop.Enabled = false;this.timer1.Stop();}private async void timer1_Tick(object sender, EventArgs e){LogClear();UpdateLog("\r\n" + "本次上传时间:" + DateTime.Now.ToString() + "\r\n");int seconds = int.Parse(this.txt_Seconds.Text.ToString());await SendData("");UpdateLog("下次上传时间:" + DateTime.Now.AddSeconds(seconds).ToString() + "\r\n");}
异步显示部分
特点:
1)如果当前线程不在主线程,利用委托,将日志显示在控件上
private void UpdateLog(string message){if (txt_SendData_Log.InvokeRequired){txt_SendData_Log.Invoke((MethodInvoker)delegate{txt_SendData_Log.AppendText(message + "\r\n");});}else{txt_SendData_Log.AppendText(message + "\r\n");}}
2)当txtbox超过100行日志时,清楚控件上的记录,释放内存,防止内存溢出。
private void LogClear(){int loglines = this.txt_SendData_Log.Lines.Length;if (loglines >= 100){this.txt_SendData_Log.Clear();}}
3)将日志写入到txt文档中,做完整的日志记录。
Loger类
public class Loger{private object LockObj = new object();private string ExePath;public Loger(){ExePath = Path.GetDirectoryName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);}private StreamWriter Create(string fullPath){StreamWriter sr;if (File.Exists(fullPath)){sr = File.AppendText(fullPath);}else{sr = File.CreateText(fullPath);}return sr;}public void SaveMsg(string cataLog, string message, DateTime dt, bool isLoger){string fileName = string.Format("{0}.log", dt.ToString("yyyyMMdd"));Save(cataLog, fileName, message, isLoger);}public void Save(string cataLog, string fileName, string msg, bool isLoger){lock (LockObj){if (isLoger){try{string fullPath = Path.Combine(ExePath + "\\" + cataLog, fileName);EnsureDirectory(fullPath);using (StreamWriter sw = Create(fullPath)){sw.WriteLine(string.Format("{0}-->:{1}\r\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg));sw.Close();}}catch { }}}}private void EnsureDirectory(string fullPath){string path = Path.GetDirectoryName(fullPath);if (!Directory.Exists(path))Directory.CreateDirectory(path);}public void Remove(string fileName){lock (LockObj){if (File.Exists(fileName)){File.Delete(fileName);}}}
调用方式
private Loger log = new Loger();log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);