当前位置: 首页 > news >正文

WevServer实现:异步日志写与HTTP连接

前言

  核心部分的主丛Reactor实现了之后,这里就要实现它的配套了,比如clientfd可读的时候,具体的可读操作是什么,当aceept监听到的时候,如何初始化一个对象来表示这个链接,这里我的实现都是很简单的实现,网上有一些更完善的实现,如果感兴趣可以自行参考,首先来说一下,这些实现基本都是参考了github的这个WebServer项目:WberServer项目

异步日志写入

  为什么要异步写入呢?这是因为IO操作是一个很费事的事情,如果我们每次的日志都由我们当前的线程写入的话,是要占用不少时间的,所以我们可以让一个线程专门负责写入的事情。当然muduo库再次基础上做了更多的优化:没必要每条日志都写入,可以先放入缓存区,缓存区满了或者定一个超时时间到了在写入也可以,这里就做一个简单实现。
  异步日志的写入是一个经典的生产者–消费者模型,所以这里现成交互的核心:阻塞队列的核心代码也遵循相关的写法:

template<class T>
void BlockDeque<T>::push_back(const T &item) {// unique_lock而不用lock_guard是因为wait的时候要释放锁std::unique_lock<std::mutex> locker(mtx_);while(deq_.size() >= capacity_) {condProducer_.wait(locker);	/*不过日志产生量大的时候,阻塞当前线程也不太好*/}deq_.push_back(item);condConsumer_.notify_one();
}template<class T>
void BlockDeque<T>::push_front(const T &item) {std::unique_lock<std::mutex> locker(mtx_);while(deq_.size() >= capacity_) {condProducer_.wait(locker);}deq_.push_front(item);condConsumer_.notify_one();
}

  在了解了阻塞队列后,如何实现一个Log类呢,首先思考它得有什么:(1) 一个写入线程,这个线程不断的从队列取出消息执行写的任务 (2)封装API,供其余Reactor调用,用来向队列中塞入信息(3)整个日志系统应该保证只有一个实例,需要用到设计模式中的单例模式

/*一个专门负责写入的线程*/
void Log::AsyncWrite_() {string str = "";while(deque_->pop(str)) {lock_guard<mutex> locker(mtx_);// 因为两个线程共享了fp_这个变量 所以要小心fputs(str.c_str(), fp_);}
}
/*根据要写入的内容,生成对应的日志信息,核心还是push_back,把日志放入阻塞队列中*/
void Log::write(int level, const char *format, ...) {struct timeval now = {0,0};gettimeofday(&now, nullptr);time_t tSec = now.tv_sec;struct tm *sysTime = localtime(&tSec);struct tm t = *sysTime;va_list vaList;if(toDay_ != t.tm_mday || (lineCount_ && (lineCount_  %  MAX_LINES == 0))){unique_lock<mutex> locker(mtx_);locker.unlock();char newFile[LOG_NAME_LEN];char tail[36] = {0};snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);if (toDay_ != t.tm_mday){snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);toDay_ = t.tm_mday;lineCount_ = 0;}else {snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_  / MAX_LINES), suffix_);}locker.lock();flush();fclose(fp_);fp_ = fopen(newFile, "a");assert(fp_ != nullptr);}{unique_lock<mutex> locker(mtx_);lineCount_++;int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ",t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);buff_.HasWritten(n);AppendLogLevelTitle_(level);/*把具体日志的信息给写入进去*/va_start(vaList, format);int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);va_end(vaList);/*进行换行*/buff_.HasWritten(m);buff_.Append("\n\0", 2);/*看是同步还是异步 同步就是自己fputs 异步就是放入队列中让另一个线程写*/if(isAsync_ && deque_ && !deque_->full()) {// 深拷贝了一下deque_->push_back(buff_.RetrieveAllToStr());} else {fputs(buff_.Peek(), fp_);}/*再清空一次是吗*/buff_.RetrieveAll();}
}

HTTP连接的抽象—HTTP COON

  在使用webbench测试的时候,需要我们去处理HTTP连接,关于HTTP连接的处理主要是解析HTTP的请求报文和生成HTTP的应答报文,这一块可以参考博客TinyWebServer:HTTP连接
  在这里的话,主要是讲讲clientfd对应的channel,到底应该怎么写它的回调函数。
  在这里的话,我曾犯过的错误就是对于对端关闭连接后处理不及时,什么意思呢?当客户端发送了FIN包之后,接收方的socket.read()操作会返回 0(表示 EOF,文件结束),这时候应该及时关闭连接,否则对于水平触发的epoll来说,如果你不关闭连接,epoll监听的这个fd就会一直返回可读(感兴趣可以搜一下源码),导致你webbench不能正常结束一次测试。
  客观来说这个实现有待改进的点还非常多:(1)假设一次读不能保证接收完所有的http报文应该怎么办呢?(2)对于写入,如果一次性需要写入大量的数据该怎么做呢?这些都是后续可以考虑的点

#include "httpconn.h"
#include <memory>using namespace std;/*因为静态成员变量是所有类共享的*/
/*所以需要单独拿出来初始化*/
/*准确来说 静态成员变量是属于类的 而不是属于类初始化的对象的*/
const char* HttpConn::srcDir;
std::atomic<int> HttpConn::userCount;// 默认初始化值为false 表示水平触发 也就是一次就读一下
bool HttpConn::isET = true;
/*静态成员函数没有 this 指针,只能访问静态成员(包括静态成员变量和静态成员函数)*/static EventLoop *CheckLoopNotNull(EventLoop *loop)
{if (loop == nullptr){LOG_ERROR(" mainLoop is null!");}return loop;
}HttpConn::HttpConn(EventLoop *loop,                  int sockfd,const std::string &nameArg,InetAddress &localAddr,const InetAddress &peerAddr)    : loop_(CheckLoopNotNull(loop)), socket_(new Socket(sockfd)), channel_(new Channel(loop, sockfd))   // channel绑定了loop和fd, name_(nameArg), localAddr_(localAddr), peerAddr_(peerAddr)
{// 下面给channel设置相应的回调函数 poller给channel通知感兴趣的事件发生了 channel会回调相应的回调函数channel_->setReadCallback(std::bind(&HttpConn::handleRead, this,std::placeholders::_1));channel_->setWriteCallback(std::bind(&HttpConn::handleWrite, this));channel_->setCloseCallback(std::bind(&HttpConn::handleClose, this));channel_->setErrorCallback(std::bind(&HttpConn::handleError, this));//LOG_INFO("TcpConnection:ctor:at fd=[%d]",sockfd);socket_->setKeepAlive(true);userCount++;writeBuff_.RetrieveAll();readBuff_.RetrieveAll();isClose_ = false;
}HttpConn::~HttpConn() { Close(); 
};void HttpConn::Close() {response_.UnmapFile();if(isClose_ == false) {isClose_ = true;userCount--;LOG_INFO("Client[%d](%s:%d) quit, UserCount:%d", socket_->fd(), GetIP(), GetPort(), (int)userCount);}
}int HttpConn::GetFd() const {return socket_->fd();
};struct sockaddr_in HttpConn::GetAddr() const {return *localAddr_.getSockAddr();
}const char* HttpConn::GetIP() const {return inet_ntoa(localAddr_.getSockAddr()->sin_addr);
}int HttpConn::GetPort() const {return localAddr_.getSockAddr()->sin_port;
}
/*包括两部分 read和Process */
/*问题是如果是水平触发 得等读完了才能操作*/
/*Process做判断了*/ssize_t HttpConn::read(int* saveErrno) {ssize_t len = -1;do {// 把数据读出来 第一次读就是0 是不应该的len = readBuff_.ReadFd(channel_->fd(), saveErrno);if (len <= 0) {break;}}while(isET);return len;
}ssize_t HttpConn::write(int* saveErrno) {ssize_t len = -1;do {len = writev(socket_->fd(), iov_, iovCnt_);if(len <= 0) {*saveErrno = errno;break;}if(iov_[0].iov_len + iov_[1].iov_len  == 0) { break; } /* 传输结束 */else if(static_cast<size_t>(len) > iov_[0].iov_len) {// 说明第一块已经被写入了iov_[1].iov_base = (uint8_t*) iov_[1].iov_base + (len - iov_[0].iov_len);iov_[1].iov_len -= (len - iov_[0].iov_len);if(iov_[0].iov_len) {/*为什么这里要清空呢*/writeBuff_.RetrieveAll();iov_[0].iov_len = 0;}} else {/*第一块还没写完呢*/iov_[0].iov_base = (uint8_t*)iov_[0].iov_base + len; iov_[0].iov_len -= len; writeBuff_.Retrieve(len);}} while(isET || ToWriteBytes() > 10240);return len;
}void HttpConn::OnProcess() {if(process()) {/*处理成功了就是可写了*/channel_->setWriting();} else {/*要不然还保持原样*/channel_->setReading();}
}
// 如果真有ERROR 那么主线程应该会通过错误回调函数关闭这个fd
void HttpConn::handleRead(Timestamp stamp) {//LOG_INFO("handle Read");ssize_t len = -1;int readErrno = 0;len = read(&readErrno);         /*如果一次性没有读完呢?*//*把数据放入到readBuff中了*/if(len <= 0 && readErrno != EAGAIN) {//LOG_ERROR("client fd : [%d] read Error len :[%d] errno : [%d]",channel_->fd(),len,readErrno);handleClose();return;}//LOG_INFO("client fd : [%d] read Success len :[%d]",channel_->fd(),len);OnProcess();
}// 把iov的数据写到fd_中
/*这里和writeBuff_的联动有待商榷*/
void HttpConn::handleWrite() {//LOG_INFO("handle Write");ssize_t len = -1;int writeErrno = 0;len = write(&writeErrno);if(ToWriteBytes() == 0) {/*表示写完了 水平模式下可能只写一次写不完*//* 传输完成 */if(IsKeepAlive()) {OnProcess(); /*改为只是可读了*/return;}}    /*可能是数据量太大了 就等epoller通知下次可写*/else if(len < 0) {if(writeErrno == EAGAIN) {channel_->setWriting();/* 继续传输保持可写就行 */return;}}//LOG_ERROR("client fd : [%d] write Error len :[%d] errno : [%d]",channel_->fd(),len,writeErrno);handleClose();
}void HttpConn::handleClose()
{//LOG_INFO("TcpConnection fd=[%d] close",channel_->fd());channel_->disableAll();channel_->remove();Close();HttpConnPtr connPtr(shared_from_this());if(connectionCallback_)connectionCallback_(connPtr); // 连接回调if(closeCallback_)closeCallback_(connPtr);      // 执行关闭连接的回调 执行的是TcpServer::removeConnection回调方法   // must be the last line
}
// 在这里 发生Error只是记录了一下吗
// 还是说主线程会关闭的呢void HttpConn::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0){err = errno;}else{err = optval;}//LOG_ERROR("TcpConnection::handleError name:[%s]- SO_ERROR:%d",name_.c_str(),err);
}/*要绑定谁呢*/
bool HttpConn::process() {request_.Init();if(readBuff_.ReadableBytes() <= 0) {return false;}else if(request_.parse(readBuff_)) {LOG_DEBUG("%s", request_.path().c_str());response_.Init(srcDir, request_.path(), request_.IsKeepAlive(), 200);} else {response_.Init(srcDir, request_.path(), false, 400);}/*把响应头写到buffer里面*//*避免频繁的malloc系统调用*/response_.MakeResponse(writeBuff_);/* 响应头 */iov_[0].iov_base = const_cast<char*>(writeBuff_.Peek());iov_[0].iov_len = writeBuff_.ReadableBytes();iovCnt_ = 1;/* 文件 */if(response_.FileLen() > 0  && response_.File()) {iov_[1].iov_base = response_.File();iov_[1].iov_len = response_.FileLen();iovCnt_ = 2;}//LOG_DEBUG("filesize:%d, %d  to %d", response_.FileLen() , iovCnt_, ToWriteBytes());return true;
}
/*这两个都是由主线程调用的 不是httpconn所属的线程*/
// 连接建立
void HttpConn::connectEstablished()
{channel_->tie(shared_from_this());//LOG_INFO("channel_fd:[%d] enable Reading",channel_->fd());channel_->enableReading(); // 向poller注册channel的EPOLLIN读事件(此时就该调用updte了)// 新连接建立 执行回调if(connectionCallback_)connectionCallback_(shared_from_this());
}
// 连接销毁
void HttpConn::connectDestroyed()
{channel_->disableAll(); // 把channel的所有感兴趣的事件从poller中删除掉if(connectionCallback_)connectionCallback_(shared_from_this());    /*这个回调就没执行过 空的*/channel_->remove(); // 把channel从poller中删除掉HttpConn::Close();
}
http://www.lqws.cn/news/464347.html

相关文章:

  • Zephyr 调试实用指南:日志系统、Shell CLI 与 GDB 全面解析
  • CLion开发Qt桌面程序_git的简单使用_小团体
  • 闲庭信步使用SV搭建图像测试平台:第五课——使用task
  • pyqt logger类与界面分开
  • 从语义到推荐:大语言模型(LLM)如何驱动智能选车系统?
  • Mac电脑-Markdown编辑器-Typora
  • c++26新功能—hive容器
  • 税务 VR 虚拟体验,带来全新办税感受
  • 【软考高级系统架构论文】论NoSQL数据库技术及其应用
  • HarmonyOS 5的分布式通信矩阵是如何工作的?
  • 鸿蒙NEXT-鸿蒙三层架构搭建,嵌入HMRouter,实现便捷跳转,新手攻略。(1/3)
  • 在AI普及的大环境下神经网络在新能源汽车热管理系统中的应用简介
  • 无线Debugger攻防全解:原理剖析与突破之道
  • Qt中的布局
  • 深入浅出:Go语言中的Cookie、Session和Token认证机制
  • 关于 Kyber:抗量子密码算法 Kyber 详解
  • 首页实现多级缓存
  • 【信号与系统四】采样和通信系统
  • rent8_wechat-最常用出租屋管理系统-微信小程序
  • Mac Parallels Desktop Kali 2025 代理设置
  • 外卖之后再度进军酒旅,京东多线出击的逻辑是什么?
  • Electron 开发桌面应用概述
  • 跟着AI学习C# Day27
  • Pytorch3D 中涉及的知识点汇总
  • 【Flutter】状态管理框架Provider和Get对比分析(面试常用)
  • Python内存使用分析工具深度解析与实践指南(下篇)
  • 江科大STM32入门:DMA传输数据
  • java 找出两个json文件的不同之处
  • 神经网络中的均方误差(Mean Squared Error)详解
  • 自定义OceanBase集群安装并使用OCP接管集群