简易服务器(TCP)
1.简单介绍以及项目技术和开发环境
本文将通过epoll完成对客户端请求的处理,通过多线程完成对客户端发送数据的处理,并提交到远端mysql
需要的使用到的一些技术有:socket网络套接字编程、IO多路转接的epoll、多线程(包括互斥锁和条件变量)
开发环境为:ubuntu20.04、C++11、Mysq5.7.42
2.模块开发之epoll_server
2.1 epoll接口封装
epoll的接口直接使用较为麻烦,所以这里博主将这块内容封装成一个内方便后续的调用
包括epoll_create、epoll_ctl、epoll_wait这三个接口,这些接口在博主前面的内容中详细讲过,所以这里就直接展示封装的类
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/epoll.h>class Epoll
{const int size = 128;public:Epoll() : _timeout(3000){_epfd = epoll_create(size);if (_epfd < 0){std::cout<<"epoll_create fail..."<<std::endl;close(_epfd);}}~Epoll() { close(_epfd); }public:void Epoll_Ctl(int op, int fd, uint32_t event){if (op == EPOLL_CTL_DEL){int n = epoll_ctl(_epfd, op, fd, nullptr);if (n != 0)std::cout << "epoll_ctl delete fail..." << std::endl;}else{struct epoll_event ev;ev.data.fd = fd;ev.events = event;int n = epoll_ctl(_epfd, op, fd, &ev);if (n != 0)std::cout << "epoll_ctl add epoll_event fail..." << std::endl;}}int Epoll_Wait(struct epoll_event *events, int maxevents){int n = epoll_wait(_epfd, events, maxevents, _timeout);if (n == 0)std::cout << "epoll_wait timeout..." << std::endl;else if (n < 0)std::cout << "epoll_wait fail..." << std::endl;return n;}private:int _epfd;int _timeout;
};
2.2 socket相关接口的封装
包括创建套接字、绑定、监听、请求连接、接受连接这五个主要的接口。
相关的接口在前面的文件讲过,并进行了封装这里就容易忘记和容易搞错的地方讲。
对于socket接口的封装,我在最后使用了setsockopt这个接口目的是为了让服务器能够快速重启,如果没有这个接口的处理,在tcp协议中如果服务端主动关闭,套接字会进入TIEM_WAIT状态等待(2*MSL),但是在这段时间内服务器依旧占据着这个端口号,如果在2*MSL这段时间内,尝试重启服务器会提示对应的端口并绑定了。
bind没什么好说的主要就是注意在绑定端口号的时候将主机字节序转换成网络字节序,在绑定ip地址的时候注意将字符串类型ip地址转成二进制类型的ip地址(我这里采用INADDR_ANY来绑定任意可用的ip地址)
listen直接调用就可以了,第二个参数时最大的连接数
accpet的封装需要我们将等到的新的文件描述符传出所以返回值类型时int,此外我们还需要将客户端的port和ip传出,我们可以通过传址的方式将他们传出,这里也需要注意传出来的ip需要将二进制格式转成字符串格式,传出来的port需要将网络字节序转成主机字节序
connect我们需要传入需要连接服务端的ip地址和端口号port,并返回连接成功还是失败
#pragma once#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"enum
{SocketErr = 2,BindErr,ListenErr,
};// TODO
const int backlog = 10;
Log lg;class Sock
{
public:Sock(){}~Sock(){}public:void Socket(){sockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (sockfd_ < 0){lg(Fatal, "socker error, %s: %d", strerror(errno), errno);exit(SocketErr);}int opt = 1;setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));}void Bind(uint16_t port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0){lg(Fatal, "bind error, %s: %d", strerror(errno), errno);exit(BindErr);}}void Listen(){if (listen(sockfd_, backlog) < 0){lg(Fatal, "listen error, %s: %d", strerror(errno), errno);exit(ListenErr);}}int Accept(std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);if(newfd < 0){lg(Warning, "accept error, %s: %d", strerror(errno), errno);return -1;}char ipstr[64];inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));*clientip = ipstr;*clientport = ntohs(peer.sin_port);return newfd;}bool Connect(const std::string &ip, const uint16_t &port){struct sockaddr_in peer;memset(&peer, 0, sizeof(peer));peer.sin_family = AF_INET;peer.sin_port = htons(port);inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));if(n == -1) {std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;return false;}return true;}void Close(){close(sockfd_);}int Fd(){return sockfd_;}private:int sockfd_;
};
2.3 epoll_server的设计
在epoll_server中我们需要使用socket和epoll相关接口,所以我们epoll_server这个类中的成员就有这两个类(因为我们并不希望类外有人访问这两个类,所以将这两个成员设置为私有的)
在这个类中除了构造函数和析构函数这两个共有函数,还额外提供init和start这两个函数,init主要完成套接字的创建、绑定、监听,而start中首先将监听套接字设置到epoll的关心事件中,然后进入循环不断的epoll_wait并处理有响应的文件描述符,在处理中有分为连接事件和读取事件,这里start的参数是我在整合代码的时候加入的目的其实是在有数据的时候唤醒消费线程(这里就是线程池)
#pragma once
#include <iostream>
#include <memory>
#include <vector>
#include <string>
#include "epoll.hpp"
#include "Socket.hpp"
#include "task.hpp"
#include "many_thread.hpp"// std::vector<std::string> strs;const uint16_t default_port = 8888;
const int max_event = 64;
const int max_buffer = 1024;class epoll_server
{
public:epoll_server() :_listen_fd(new Sock),_epoll_fd(new Epoll){}~epoll_server(){}
public:void init(uint16_t port = default_port){_listen_fd->Socket();_listen_fd->Bind(port);_listen_fd->Listen();}void start(zkj::many_pthreads* mp){_epoll_fd->Epoll_Ctl(EPOLL_CTL_ADD,_listen_fd->Fd(),EPOLLIN);for(;;){struct epoll_event events[max_event];int n = _epoll_fd->Epoll_Wait(events,max_event);for(int i = 0;i<n;i++){if(events[i].data.fd == _listen_fd->Fd()){//客户端请求连接事件uint16_t client_port;std::string client_ip;int client_fd = _listen_fd->Accept(&client_ip,&client_port);_epoll_fd->Epoll_Ctl(EPOLL_CTL_ADD,client_fd,EPOLLIN);}else{char buffer[max_buffer];memset(buffer,0,sizeof buffer);int len = read(events[i].data.fd,buffer,sizeof buffer);if(len < 0){std::cout<<"read fail..."<<std::endl;return ;}else if(len == 0){std::cout<<"read content is 0..."<<std::endl;_epoll_fd->Epoll_Ctl(EPOLL_CTL_DEL,events[i].data.fd,0);}else{// std::cout<<"client talk#"<<buffer<<std::endl;//将从客户端获取的数据存储到tasks中std::string buf_str = buffer;// std::cout<<buf_str/*<<std::endl*/;tasks.push_back(buf_str);mp->Cond_Signal();}}}}}
private:std::shared_ptr<Sock> _listen_fd;std::shared_ptr<Epoll> _epoll_fd;
};
3.模块开发之many_thread
3.1 mysql接口的封装
这个就比较简单,就是创建数据库、连接数据库、向数据库输入语句这三个接口
如果你们的数据库端口号没有改过,在创建自己的用户并设置密码就可以使用了
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <mysql/mysql.h>
#include "task.hpp"namespace zkj
{const std::string host = "localhost";const std::string user = "zkj";const std::string passwd = "123456";const std::string db = "text1";const unsigned int port = 3306;class Mysql_Class{public:Mysql_Class() : my(mysql_init(nullptr)) {}~Mysql_Class() { mysql_close(my); }public:void Mysql_Real_Connect(){if (my == nullptr){std::cout << "mysql init fail..." << std::endl;return;}if (mysql_real_connect(my, host.c_str(), user.c_str(), passwd.c_str(), db.c_str(), port, nullptr, 0) == nullptr){std::cerr << "connect MYSQL error" << std::endl;return;}}void Mysql_Query(const std::string &sqlstr){int n = mysql_query(my, sqlstr.c_str());if (n == 0)std::cout << sqlstr << " success" << std::endl;elsestd::cout << sqlstr << " fail" << std::endl;}private:MYSQL *my;};
}
3.2 task任务类设计
task内的接口包括解析获得的字符串,并将解析得到的属性组织成sql语句并返回,用到的操作就是substr和find
#pragma once
#include <iostream>
#include <vector>namespace zkj{class task{public:task(){}~task(){}public:task(const std::string &str) :_taskstr(str){}void Analysis_Str(){//1 zhangsan 99 99 99 str的格式auto pos = _taskstr.find(" ");id = std::stoi(_taskstr.substr(0,pos));auto pos2 = _taskstr.find(" ",pos+1);name = _taskstr.substr(pos+1,pos2-pos-1);auto pos3 = _taskstr.find(" ",pos2+1);chinese = std::stoi(_taskstr.substr(pos2+1,pos3-pos2-1));auto pos4 = _taskstr.find(" ",pos3+1);math = std::stoi(_taskstr.substr(pos3+1,pos4-pos3-1));auto pos5 = _taskstr.find(" ",pos4+1);englisg = std::stoi(_taskstr.substr(pos4+1));//"insert into exam_result values (11,'zhangsan',8,9,8)" 这是向数据库插入的语句sql_insert_str = "insert into exam_result values (" + std::to_string(id) + "," + "'"+ name + "'" +"," +std::to_string(chinese)+ "," +std::to_string(math)+ "," +std::to_string(englisg)+")";}void Print_Analysis_Str(){std::cout<<sql_insert_str<<std::endl;}std::string Ret_Sql_str(){return sql_insert_str;}private:std::string _taskstr;int id;std::string name;int chinese;int math;int englisg;std::string sql_insert_str;};
}extern std::vector<zkj::task> tasks;
3.3 many_thread设计
在线程池中我设计的类内成员有mutex、cond(条件变量)、sql、以及用vector组织的线程属性pthreads
线程的属性通过pthread_info来记录
在线程池的构造函数中完成_pthread和sql的初始化,通过new为这两个智能指针初始化,并完成mutex和cond的初始化。
对于mutex和cond的相关接口我都进行了封装以方便后续的调用
在start中主要任务就是创建一批线程,每个线程的任务都是判断有没有内容需要处理,如果有就处理,没有就等待并释放锁。
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <memory>
#include <pthread.h>
#include <unistd.h>
#include "task.hpp"
#include "mysql.hpp"namespace zkj
{
#define PTHREAD_MAX 10
#define START_FLAG true
#define PRODUCE_THREAD true
#define MAX_STR 100000struct pthread_info{pthread_info(){}pthread_info(pthread_t thread_id) : _thread_id(thread_id) {std::string name = "thread - " + std::to_string(thread_id);_thread_name = name;}pthread_t _thread_id;std::string _thread_name;};class many_pthreads{public:many_pthreads() :sql(new zkj::Mysql_Class()){sql->Mysql_Real_Connect();_pthreads.resize(PTHREAD_MAX);pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}~many_pthreads(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);for(int i = 0;i<PTHREAD_MAX;i++)pthread_join(_pthreads[i]._thread_id,nullptr);}public:void Lock() { pthread_mutex_lock(&_mutex); }void UnLock() { pthread_mutex_unlock(&_mutex); }void Cond_Wait() { pthread_cond_wait(&_cond, &_mutex); }void Cond_Signal() { pthread_cond_signal(&_cond); }bool empty() { return tasks.empty(); }void Push_Task(std::string str) { tasks.push_back(str); }int Get_Pthread_id(pthread_t pid){for (int i = 0; i < PTHREAD_MAX; i++){if (_pthreads[i]._thread_id == pid)return i;}return -1;}zkj::task Get_Task_Front(){zkj::task ret = tasks.front();tasks.erase(tasks.begin());return ret;}std::shared_ptr<zkj::Mysql_Class>& Get_Sql_ptr(){return sql;}public:static void *thread_test(void *argc){many_pthreads *pthreads = static_cast<many_pthreads *>(argc);while (1){pthreads->Lock();while (pthreads->empty()){pthreads->Cond_Wait();}zkj::task client_I = pthreads->Get_Task_Front();// std::cout << "pthread_t : " << pthreads->Get_Pthread_id(pthread_self()) << "server say -> " << client_I << std::endl;client_I.Analysis_Str();client_I.Print_Analysis_Str();pthreads->Get_Sql_ptr()->Mysql_Query(client_I.Ret_Sql_str());pthreads->UnLock();sleep(1);}}void start(){std::cout<<"GDB--"<<"many thread stary chenge"<<std::endl;for (int i = 0; i < PTHREAD_MAX; i++){pthread_t t;pthread_info t_info(t);_pthreads[i] = t_info;pthread_create(&t, nullptr, thread_test, this);}}private:std::vector<pthread_info> _pthreads;// std::vector<std::string> tasks;std::shared_ptr<zkj::Mysql_Class> sql;pthread_cond_t _cond;pthread_mutex_t _mutex;};}
4.测试程序
#include <pthread.h>
#include "epoll_server.hpp"
#include "many_thread.hpp"std::vector<zkj::task> tasks;void *Server_Thread(void *argc)
{// std::cout<<"GDB--"<<"epoll server create 成功"<<std::endl;zkj::many_pthreads* mp = static_cast<zkj::many_pthreads*>(argc);std::shared_ptr<epoll_server> ev(new epoll_server());ev->init();ev->start(mp);
}void *Many_Thread(void* argc)
{// std::shared_ptr<zkj::many_pthreads> mp(new zkj::many_pthreads());zkj::many_pthreads* mp = static_cast<zkj::many_pthreads*>(argc);mp->start();
}int main()
{zkj::many_pthreads mp;pthread_t server_id;pthread_create(&server_id,nullptr,Server_Thread,&mp);pthread_t many_threads_id;pthread_create(&many_threads_id,nullptr,Many_Thread,&mp);pthread_join(server_id,nullptr);pthread_join(many_threads_id,nullptr);return 0;
}
到这里整个小项目就结束了,总体来说难度不大,用到的东西比较多,环境方面可能配置比较麻烦。