轻量级Web服务器学习笔记
学习时间:2023年4月12日
项目地址:https://github.com/qinguoyi/TinyWebServer
1 框架
2 源码结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| |---CGImysql 数据库 | |---sql_connection_pool.h | |---sql_connection_pool.cpp |---http http连接封装 | |---http_conn.cpp | |---http_conn.h |---lock 锁封装 | |---locker.h |---log 日志 | |---block_queue.h | |---log.cpp | |---log.h |---threadpool 线程池封装 | |---threadpool.h |---timer 定时器和工具类封装 | |---lst_timer.cpp | |---lst_timer.h |---build.sh |---config.cpp 配置类实现 |---config.h 配置类 |---main.cpp 入口 |---makefile |---webserver.cpp 服务器类的实现 |---webserver.h 服务器类的定义
|
3 线程同步机制封装类
线程同步机制详见[[Linux系统编程学习笔记#8.5 线程同步]],可以使用信号量,互斥量和条件变量三种机制实现线程同步。
3.1 基础知识
3.1.1 RAII
- RAII全称是
Resource Acquisition is Initialization
,直译过来是“资源获取即初始化”.
- 在构造函数中申请分配资源,在析构函数中释放资源。因为C++的语言机制保证了,当一个对象创建的时候,自动调用构造函数,当对象超出作用域的时候会自动调用析构函数。所以,在RAII的指导下,我们应该使用类来管理资源,将资源和对象的生命周期绑定
- RAII的核心思想是将资源或者状态与对象的生命周期绑定,通过C++的语言机制,实现资源和状态的安全管理,智能指针是RAII最好的例子
3.1.2 信号量
表示信号量的数据结构为sem_t
,本质是一个长整数:
1 2 3 4 5
| #include <semaphore.h> typedef union { char __size[__SIZEOF_SEM_T]; long int __align; } sem_t;
|
相关系统调用:
1 2 3
|
int sem_init(sem_t *sem, int pshared, unsigned int value);
|
pshared
参数控制着信号量的类型。如果 pshared的值是0,就表示它是当前进程的局部信号量;否则,其它进程就能够共享这个信号量。一般设置为0;
value
参数指定信号量的初始值
1 2 3 4
| int sem_wait(sem_t * sem);
int sem_post(sem_t * sem);
|
1 2
| int sem_destroy (sem_t *sem);
|
3.1.3 互斥量
互斥变量是用 pthread_mutex_t
数据类型表示的。详见[[Linux系统编程学习笔记#8.5.2 互斥量]]。
3.1.4 条件变量
条件变量由pthread_cond_t
数据类型表示。详见[[Linux系统编程学习笔记#8.5.5 条件变量]]。
3.2 源码解析
源码:lock/locker.cpp
,是对三种同步机制的封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| #ifndef LOCKER_H #define LOCKER_H
#include <exception> #include <pthread.h> #include <semaphore.h>
class sem { public: sem() { if (sem_init(&m_sem, 0, 0) != 0) { throw std::exception(); } } sem(int num) { if (sem_init(&m_sem, 0, num) != 0) { throw std::exception(); } } ~sem() { sem_destroy(&m_sem); } bool wait() { return sem_wait(&m_sem) == 0; } bool post() { return sem_post(&m_sem) == 0; }
private: sem_t m_sem; };
class locker { public: locker() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy(&m_mutex); } bool lock() { return pthread_mutex_lock(&m_mutex) == 0; } bool unlock() { return pthread_mutex_unlock(&m_mutex) == 0; } pthread_mutex_t *get() { return &m_mutex; }
private: pthread_mutex_t m_mutex; };
class cond { public: cond() { if (pthread_cond_init(&m_cond, NULL) != 0) { throw std::exception(); } } ~cond() { pthread_cond_destroy(&m_cond); } bool wait(pthread_mutex_t *m_mutex) { int ret = 0; ret = pthread_cond_wait(&m_cond, m_mutex); return ret == 0; } bool timewait(pthread_mutex_t *m_mutex, struct timespec t) { int ret = 0; ret = pthread_cond_timedwait(&m_cond, m_mutex, &t); return ret == 0; } bool signal() { return pthread_cond_signal(&m_cond) == 0; } bool broadcast() { return pthread_cond_broadcast(&m_cond) == 0; }
private: pthread_cond_t m_cond; }; #endif
|
4 半同步反应堆线程池
4.1 事件处理模式
- reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
- proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。
4.2 同步I/O模拟proactor模式
由于异步I/O并不成熟,实际中使用较少,这里将使用同步I/O模拟实现proactor模式。
同步I/O模型的工作流程如下(epoll_wait为例):
主线程往epoll内核事件表注册socket上的读就绪事件。
主线程调用epoll_wait等待socket上有数据可读或可写,其中可读的条件满足下面之一即可:
- 新的连接到来
- 异常
- 副套接字可读(处理客户连接上接收到的数据)
- 定时器信号
当socket上有数据可读,epoll_wait通知主线程,主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。
睡眠在请求队列上某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
主线程调用epoll_wait等待socket可写。
当socket上有数据可写,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。
4.3 源码解析
源码文件:threadpool.h
,类的定义和实现写在同一个文件里。
线程池的设计模式为半同步/半反应堆,其中反应堆具体为Proactor事件处理模式。
具体的,主线程为异步线程,负责监听文件描述符,接收socket新连接,若当前监听的socket发生了读写事件,然后将任务插入到请求队列。工作线程从请求队列中取出任务,完成读写数据的处理。
4.3.1 线程池类定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| template <typename T> class threadpool { public: threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000); ~threadpool(); bool append(T *request, int state); bool append_p(T *request);
private: static void *worker(void *arg); void run();
private: int m_thread_number; int m_max_requests; pthread_t *m_threads; std::list<T *> m_workqueue; locker m_queuelocker; sem m_queuestat; connection_pool *m_connPool; int m_actor_model; };
|
4.3.2 线程池创建与销毁
构造函数中创建线程池,pthread_create
函数中将类的对象作为参数传递给静态函数(worker),在静态函数中引用这个对象,并调用其动态方法(run)。
具体的,类对象传递时用this指针,传递给静态函数后,将其转换为线程池类,并调用私有成员函数run。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| template <typename T> threadpool<T>::threadpool(int actor_model, connection_pool *connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL),m_connPool(connPool) {
if(thread_number <= 0 || max_requests <= 0) throw std::exception(); m_threads = new pthread_t[m_thread_number]; if(!m_threads) throw std::exception(); for(int i = 0; i < thread_number; ++i) { if (pthread_create(m_threads[i], NULL, worker, this) != 0) { delete[] m_threads; throw std::exception(); } if (pthread_detach(m_threads[i])) { delete[] m_threads; throw std::exception(); } } }
template <typename T> threadpool<T>::~threadpool() { delete[] m_threads; }
|
4.3.3 向请求队列中添加任务
通过list容器创建请求队列,向队列中添加时,通过互斥锁保证线程安全,添加完成后通过信号量提醒有任务要处理,最后注意线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| template <typename T> bool threadpool<T>::append_p(T *request) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| template <typename T> bool threadpool<T>::append(T *request, int state) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } request->m_state = state; m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; }
|
4.3.4 线程池启动函数
内部访问私有成员函数run,完成线程处理要求。
1 2 3 4 5 6 7 8 9
| template <typename T> void *threadpool<T>::worker(void *arg) { threadpool *pool = (threadpool *)arg; pool->run(); return pool; }
|
4.3.5 线程池执行函数
主要实现,工作线程从请求队列中取出某个任务进行处理,注意线程同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| template <typename T> void threadpool<T>::run() { while (true) { m_queuestat.wait(); m_queuelocker.lock(); if (m_workqueue.empty()) { m_queuelocker.unlock(); continue; } T *request = m_workqueue.front(); m_workqueue.pop_front(); m_queuelocker.unlock(); if (!request) continue; if (1 == m_actor_model) { if (0 == request->m_state) { if (request->read_once()) { request->improv = 1; connectionRAII mysqlcon(&request->mysql, m_connPool); request->process(); } else { request->improv = 1; request->timer_flag = 1; } } else { if (request->write()) { request->improv = 1; } else { request->improv = 1; request->timer_flag = 1; } } } else { connectionRAII mysqlcon(&request->mysql, m_connPool); request->process(); } } }
|
5 HTTP连接处理
5.1 触发模式
LT水平触发模式
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序可以不立即处理该事件。
- 当下一次调用epoll_wait时,epoll_wait还会再次向应用程序报告此事件,直至被处理
ET边缘触发模式
- epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序必须立即处理该事件
- 必须要一次性将数据读取完,使用非阻塞I/O,读取到出现
eagain
5.2 HTTP报文格式
HTTP报文分为请求报文和响应报文两种,每种报文必须按照特有格式生成,才能被浏览器端识别。
其中,浏览器端向服务器发送的为请求报文,服务器处理后返回给浏览器端的为响应报文。
5.2.1 请求报文
HTTP请求报文由请求行(request line)、请求头部(header)、空行和请求数据四个部分组成。
其中,请求分为两种,GET和POST,具体的:
1 2 3 4 5 6 7 8 9 10
| GET /562f25980001b1b106000338.jpg HTTP/1.1\r\n Host:img.mukewang.com\r\n User-Agent:Mozilla/5.0 (Windows NT 10.0; WOW64)\r\n AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36\r\n Accept:image/webp,image/*,*/*;q=0.8\r\n Referer:http://www.imooc.com/\r\n Accept-Encoding:gzip, deflate, sdch\r\n Accept-Language:zh-CN,zh;q=0.8\r\n 空行\r\n 请求数据为空
|
1 2 3 4 5 6 7 8
| POST / HTTP1.1\r\n Host:www.wrox.com\r\n User-Agent:Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022)\r\n Content-Type:application/x-www-form-urlencoded\r\n Content-Length:40\r\n Connection: Keep-Alive\r\n 空行\r\n name=Professional%20Ajax&publisher=Wiley\r\n
|
各字段含义
请求行,用来说明请求类型,要访问的资源以及所使用的HTTP版本。
GET说明请求类型为GET,/562f25980001b1b106000338.jpg(URL)为要访问的资源,该行的最后一部分说明使用的是HTTP1.1版本。
请求头部,紧接着请求行(即第一行)之后的部分,用来说明服务器要使用的附加信息。
- HOST,给出请求资源所在服务器的域名。
- User-Agent,HTTP客户端程序的信息,该信息由你发出请求使用的浏览器来定义,并且在每个请求中自动发送等。
- Accept,说明用户代理可处理的媒体类型。
- Accept-Encoding,说明用户代理支持的内容编码。
- Accept-Language,说明用户代理能够处理的自然语言集。
- Content-Type,说明实现主体的媒体类型。
- Content-Length,说明实现主体的大小。
- Connection,连接管理,可以是Keep-Alive或close。
空行,请求头部后面的空行是必须的即使第四部分的请求数据为空,也必须有空行。
请求数据也叫主体,可以添加任意的其他数据。
5.2.2 响应报文
HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。
1 2 3 4 5 6 7 8 9 10
| HTTP/1.1 200 OK\r\n Date: Fri, 22 May 2009 06:07:21 GMT\r\n Content-Type: text/html; charset=UTF-8\r\n 空行\r\n <html>\r\n <head></head>\r\n <body>\r\n <!--body goes here-->\r\n </body>\r\n </html>\r\n
|
各字段含义
- 状态行,由HTTP协议版本号, 状态码, 状态消息 三部分组成。
第一行为状态行,(HTTP/1.1)表明HTTP版本为1.1版本,状态码为200,状态消息为OK。
- 消息报头,用来说明客户端要使用的一些附加信息。
第二行和第三行为消息报头,Date:生成响应的日期和时间;Content-Type:指定了MIME类型的HTML(text/html),编码类型是UTF-8。
- 空行,消息报头后面的空行是必须的。
- 响应正文,服务器返回给客户端的文本信息。空行后面的html部分为响应正文。
5.3 状态机
5.3.1 http报文处理流程
浏览器端发出http连接请求,主线程创建http对象接收请求并将所有数据读入对应buffer,将该对象插入任务队列,工作线程从任务队列中取出一个任务进行处理。
工作线程取出任务后,调用process_read
函数,通过主、从状态机对请求报文进行解析。
解析完之后,跳转do_request
函数生成响应报文,通过process_write
写入buffer,返回给浏览器端。
5.3.2 状态机和服务器解析请求报文
从状态机负责读取报文的一行,主状态机负责对该行数据进行解析,主状态机内部调用从状态机,从状态机驱动主状态机。
主状态机
三种状态,标识解析位置。
- CHECK_STATE_REQUESTLINE,解析请求行
- CHECK_STATE_HEADER,解析请求头
- CHECK_STATE_CONTENT,解析消息体,仅用于解析POST请求
主状态机运行逻辑:
主状态机初始状态是CHECK_STATE_REQUESTLINE,通过调用从状态机来驱动主状态机,在主状态机进行解析前,从状态机已经将每一行的末尾\r\n
符号改为\0\0
,以便于主状态机直接取出对应字符串进行处理。
CHECK_STATE_REQUESTLINE
- 主状态机的初始状态,调用parse_request_line函数解析请求行
- 解析函数从m_read_buf中解析HTTP请求行,获得请求方法、目标URL及HTTP版本号
- 解析完成后主状态机的状态变为CHECK_STATE_HEADER
- 解析完请求行后,主状态机继续分析请求头。在报文中,请求头和空行的处理使用的同一个函数,这里通过判断当前的text首位是不是\0字符,若是,则表示当前处理的是空行,若不是,则表示当前处理的是请求头。
CHECK_STATE_HEADER
- 调用parse_headers函数解析请求头部信息
- 判断是空行还是请求头,若是空行,进而判断content-length是否为0,如果不是0,表明是POST请求,则状态转移到CHECK_STATE_CONTENT,否则说明是GET请求,则报文解析结束。
- 若解析的是请求头部字段,则主要分析connection字段,content-length字段,其他字段可以直接跳过,各位也可以根据需求继续分析。
- connection字段判断是keep-alive还是close,决定是长连接还是短连接
- content-length字段,这里用于读取post请求的消息体长度
- 如果仅仅是GET请求
- CHECK_STATE_CONTENT
- 仅用于解析POST请求,调用parse_content函数解析消息体
- 用于保存post请求消息体,为后面的登录和注册做准备
从状态机
从状态机有三种状态,标识解析一行的读取状态。
- LINE_OK,完整读取一行
- LINE_BAD,报文语法有误
- LINE_OPEN,读取的行不完整
从状态机运行逻辑:
在HTTP报文中,每一行的数据由\r\n
作为结束字符,空行则是仅仅是字符\r\n
。因此,可以通过查找\r\n
将报文拆解成单独的行进行解析,项目中便是利用了这一点。
从状态机负责读取buffer中的数据,将每行数据末尾的\r\n置为\0\0,并更新从状态机在buffer中读取的位置m_checked_idx
,以此来驱动主状态机解析。
- 从状态机从m_read_buf中逐字节读取,判断当前字节是否为\r
- 接下来的字符是\n,将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK,表示读取了一行数据
- 接下来达到了buffer末尾,表示buffer还需要继续接收,返回LINE_OPEN
- 否则,表示语法错误,返回LINE_BAD
- 当前字节不是\r,判断是否是\n(一般是上次读取到\r就到了buffer末尾,没有接收完整,再次接收时会出现这种情况)
HTTP状态码
5.3.3 状态机和服务器响应请求报文
浏览器端发出HTTP请求报文,服务器端接收该报文并调用process_read
对其进行解析,根据解析结果HTTP_CODE
,进入相应的逻辑和模块。
其中,服务器子线程完成报文的解析与响应;主线程监测读写事件,调用read_once
和http_conn::write
完成数据的读取与发送。
HTTP_CODE含义
表示HTTP请求的处理结果,在头文件中初始化了八种情形,在报文解析与响应中只用到了七种。
NO_REQUEST
-
GET_REQUEST
-
NO_RESOURCE
-
BAD_REQUEST
-
FORBIDDEN_REQUEST
-
FILE_REQUEST
-
INTERNAL_ERROR
- 服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发
5.4 源码分析
5.4.1 http类定义
源码:http_conn.h
5.4.2 epoll相关代码
项目中epoll相关代码部分包括非阻塞模式、内核事件表注册事件、删除事件、重置EPOLLONESHOT事件四种。
1 2 3 4 5 6 7 8 9 10
| int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; }
|
- 内核事件表注册新事件,开启
EPOLLONESHOT
,针对客户端连接的描述符,listenfd不用开启
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void addfd(int epollfd, int fd, bool one_shot, int TRIGMode) { epoll_event event; event.data.fd = fd; if (1 == TRIGMode) event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; else event.events = EPOLLIN | EPOLLRDHUP;
if (one_shot) event.events |= EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); }
|
1 2 3 4 5
| void removefd(int epollfd, int fd) { epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0); close(fd); }
|
1 2 3 4 5 6 7 8 9 10 11 12
| void modfd(int epollfd, int fd, int ev, int TRIGMode) { epoll_event event; event.data.fd = fd;
if (1 == TRIGMode) event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP; else event.events = ev | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); }
|
5.4.3 服务器接收http请求
该部分的实现源码位于webserver.cpp
。详见7.2.1.④和⑤节
5.4.4 读取请求报文
read_once
读取浏览器端发送来的请求报文,直到无数据可读或对方关闭连接,读取到m_read_buffer
中,并更新m_read_idx
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
bool http_conn::read_once() { if (m_read_idx >= READ_BUFFER_SIZE) { return false; } int bytes_read = 0;
if (0 == m_TRIGMode) { bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0); m_read_idx += bytes_read; if (bytes_read <= 0) { return false; } return true; } else { while (true) { bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0); if (bytes_read == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; return false; } else if (bytes_read == 0) { return false; } m_read_idx += bytes_read; } return true; } }
|
5.4.5 http请求报文解析
各子线程通过process
函数对任务进行处理,调用process_read
函数和process_write
函数分别完成报文解析与报文响应两个任务(驱动主从状态机)。
process_read
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| http_conn::HTTP_CODE http_conn::process_read() { LINE_STATUS line_status = LINE_OK; HTTP_CODE ret = NO_REQUEST; char *text = 0; while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)) { text = get_line(); m_start_line = m_checked_idx; LOG_INFO("%s", text); switch (m_check_state){ case CHECK_STATE_REQUESTLINE: { ret = parse_request_line(text); if (ret == BAD_REQUEST) return BAD_REQUEST; break; } case CHECK_STATE_HEADER: { ret = parse_headers(text); if (ret == BAD_REQUEST) return BAD_REQUEST; else if (ret == GET_REQUEST) { return do_request(); } break; } case CHECK_STATE_CONTENT: { ret = parse_content(text); if (ret == GET_REQUEST) return do_request(); line_status = LINE_OPEN; break; } default: return INTERNAL_ERROR; } } return NO_REQUEST; }
|
解析一行的函数parse_line
,用于驱动从状态机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
http_conn::LINE_STATUS http_conn::parse_line() { char temp; for (; m_checked_idx < m_read_idx; ++m_checked_idx) { temp = m_read_buf[m_checked_idx]; if (temp == '\r') { if ((m_checked_idx + 1) == m_read_idx) return LINE_OPEN; else if (m_read_buf[m_checked_idx + 1] == '\n') { m_read_buf[m_checked_idx++] = '\0'; m_read_buf[m_checked_idx++] = '\0'; return LINE_OK; } return LINE_BAD; } else if (temp == '\n') { if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r') { m_read_buf[m_checked_idx - 1] = '\0'; m_read_buf[m_checked_idx++] = '\0'; return LINE_OK; } return LINE_BAD; } } return LINE_OPEN; }
|
解析请求行、头、体的函数分别为parse_request_line
,parse_headers
,parse_content
,用于驱动主状态机。
初始状态为解析请求行。请求行各个选项之间用空格或\t
分隔。
相关库函数
1
| char *strpbrk(const char *str1, const char *str2)
|
- str1 – 要被检索的 C 字符串。
- str2 – 该字符串包含了要在 str1 中进行匹配的字符列表。
- 在源字符串(s1)中找出最先含有搜索字符串(s2)中任一字符的位置并返回,若找不到则返回空指针。
1
| int strcasecmp(const char *s1, const char *s2);
|
- 用来比较参数s1和s2字符串,比较时会自动忽略大小写的差异。
- 若参数s1和s2字符串相等则返回0。s1大于s2则返回大于0 的值,s1 小于s2 则返回小于0的值。
1
| size_t strspn(const char *str1, const char *str2)
|
- str1 – 要被检索的 C 字符串。
- str2 – 该字符串包含了要在 str1 中进行匹配的字符列表。
- 该函数返回 str1 中第一个不在字符串 str2 中出现的字符下标。
解析请求行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| http_conn::HTTP_CODE http_conn::parse_request_line(char *text) { m_url = strpbrk(text, " \t"); if (!m_url) { return BAD_REQUEST; }
*m_url++ = '\0'; char *method = text; if (strcasecmp(method, "GET") == 0) m_method = GET; else if (strcasecmp(method, "POST") == 0) { m_method = POST; cgi = 1; } else return BAD_REQUEST; m_url += strspn(m_url, " \t"); m_version = strpbrk(m_url, " \t"); if (!m_version) return BAD_REQUEST; *m_version++ = '\0'; m_version += strspn(m_version, " \t"); if (strcasecmp(m_version, "HTTP/1.1") != 0) return BAD_REQUEST; if (strncasecmp(m_url, "http://", 7) == 0) { m_url += 7; m_url = strchr(m_url, '/'); }
if (strncasecmp(m_url, "https://", 8) == 0) { m_url += 8; m_url = strchr(m_url, '/'); }
if (!m_url || m_url[0] != '/') return BAD_REQUEST; if (strlen(m_url) == 1) strcat(m_url, "judge.html"); m_check_state = CHECK_STATE_HEADER; return NO_REQUEST; }
|
解析请求头
略
解析请求体
1 2 3 4 5 6 7 8 9 10
| http_conn::HTTP_CODE http_conn::parse_content(char *text) { if (m_read_idx >= (m_content_length + m_checked_idx)) { text[m_content_length] = '\0'; m_string = text; return GET_REQUEST; } return NO_REQUEST; }
|
5.4.6 响应http请求报文
浏览器端发出HTTP请求报文,服务器端接收该报文并调用process_read
对其进行解析,根据解析结果HTTP_CODE
,进入相应的逻辑和模块。
预备:相关API
mmap:用于将一个文件或其他对象映射到内存,提高文件的访问速度。
1 2
| void* mmap(void* start,size_t length,int prot,int flags,int fd,off_t offset); int munmap(void* start,size_t length);
|
start:映射区的开始地址,设置为0时表示由系统决定映射区的起始地址
length:映射区的长度
prot:期望的内存保护标志,不能与文件的打开模式冲突
-
flags:指定映射对象的类型,映射选项和映射页是否可以共享
- MAP_PRIVATE 建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件
fd:有效的文件描述符,一般是由open()函数返回
off_toffset:被映射对象内容的起点
iovec:定义了一个向量元素,通常,这个结构用作一个多元素的数组。
1 2 3 4
| struct iovec { void *iov_base; size_t iov_len; };
|
- iov_base指向数据的地址
- iov_len表示数据的长度
writev:用于在一次函数调用中写多个非连续缓冲区,有时也将这该函数称为聚集写。
1 2
| #include <sys/uio.h> ssize_t writev(int filedes, const struct iovec *iov, int iovcnt);
|
- filedes表示文件描述符
- iov为前述io向量机制结构体iovec
- iovcnt为结构体的个数
若成功则返回已写的字节数,若出错则返回-1。writev
以顺序iov[0]
,iov[1]
至iov[iovcnt-1]
从缓冲区中聚集输出数据。writev
返回输出的字节总数,通常,它应等于所有缓冲区长度之和。
do_request函数
process_read
函数的返回值是对请求的文件分析后的结果,一部分是语法错误导致的BAD_REQUEST
,一部分是do_request
的返回结果,该函数将网站根目录和url
文件拼接,然后通过stat判断该文件属性。另外,为了提高访问速度,通过mmap进行映射,将普通文件映射到内存逻辑地址。
process_write函数
根据do_request
的返回状态,服务器子线程调用向m_write_buf
中写入响应报文。
add_status_line函数,添加状态行:http/1.1 状态码 状态消息
add_headers函数添加消息报头,内部调用add_content_length和add_linger函数
content-length记录响应报文长度,用于浏览器端判断服务器是否发送完数据
- connection记录连接状态,用于告诉浏览器端保持长连接
add_blank_line添加空行
上述涉及的5个函数,均是内部调用add_response
函数更新m_write_idx
指针和缓冲区m_write_buf
中的内容。
6 定时器处理非活动连接
6.1 概述
非活跃
,是指客户端(这里是浏览器)与服务器端建立连接后,长时间不交换数据,一直占用服务器端的文件描述符,导致连接资源的浪费。
定时事件
,是指固定一段时间之后触发某段代码,由该段代码处理一个事件,如从内核事件表删除事件,并关闭文件描述符,释放连接资源。
定时器
,是指利用结构体或其他形式,将多种定时事件进行封装起来。具体的,这里只涉及一种定时事件,即定期检测非活跃连接,这里将该定时事件与连接资源封装为一个结构体定时器。
定时器容器
,是指使用某种容器类数据结构,将上述多个定时器组合起来,便于对定时事件统一管理。具体的,项目中使用升序链表将所有定时器串联组织起来。
本项目中,服务器主循环为每一个连接创建一个定时器,并对每个连接进行定时。另外,利用升序时间链表容器将所有定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务。
Linux
下提供了三种定时的方法:
- socket选项SO_RECVTIMEO和SO_SNDTIMEO
- SIGALRM信号
- I/O复用系统调用的超时参数
具体的,利用alarm
函数周期性地触发SIGALRM
信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。
6.2 统一事件源
统一事件源,是指将信号事件与其他事件一样被处理。
具体的,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。
6.3 源码分析
6.3.1 信号处理函数
源码位于lst_timer.cpp
中
1 2 3 4 5 6 7 8 9 10 11
| void Utils::sig_handler(int sig) { int save_errno = errno; int msg = sig; send(u_pipefd[1], (char *)&msg, 1, 0); errno = save_errno; }
|
信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响。
1 2 3 4 5 6 7 8 9 10 11 12
| void Utils::addsig(int sig, void(handler)(int), bool restart) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; if (restart) sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); }
|
6.3.2 信号通知逻辑
创建管道,其中管道写端写入信号值,管道读端通过I/O复用系统监测读事件
设置信号处理函数SIGALRM(时间到了触发)和SIGTERM(kill会触发,Ctrl+C)
通过struct sigaction结构体和sigaction函数注册信号捕捉函数
- 在结构体的handler参数设置信号处理函数,具体的,从管道写端写入信号的名字
利用I/O复用系统监听管道读端文件描述符的可读事件
信息值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| bool WebServer::dealwithsignal(bool &timeout, bool &stop_server) { int ret = 0; int sig; char signals[1024]; ret = recv(m_pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { return false; } else if (ret == 0) { return false; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; break; } } } } return true; }
|
为什么管道写端要非阻塞?
send是将信息发送给套接字缓冲区,如果缓冲区满了,则会阻塞,这时候会进一步增加信号处理函数的执行时间,为此,将其修改为非阻塞。
没有对非阻塞返回值处理,如果阻塞是不是意味着这一次定时事件失效了?
是的,但定时事件是非必须立即处理的事件,可以允许这样的情况发生。
管道传递的是什么类型?switch-case的变量冲突?
信号本身是整型数值,管道中传递的是ASCII码表中整型数值对应的字符。
switch的变量一般为字符或整型,当switch的变量为字符时,case中可以是字符,也可以是字符对应的ASCII码。
6.3.3 定时器设计
项目中将连接资源、定时事件和超时时间封装为定时器类,具体的,
- 连接资源包括客户端套接字地址、文件描述符和定时器
- 定时事件为回调函数,将其封装起来由用户自定义,这里是删除非活动socket上的注册事件,并关闭
- 定时器超时时间 = 浏览器和服务器连接时刻 + 固定时间(TIMESLOT),可以看出,定时器使用绝对时间作为超时值,这里alarm设置为5秒,连接超时为15秒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
class util_timer;
struct client_data { sockaddr_in address; int sockfd; util_timer *timer; };
class util_timer { public: util_timer() : prev(NULL), next(NULL) {}
public: time_t expire; void (* cb_func)(client_data *); client_data *user_data; util_timer *prev; util_timer *next; };
|
定时事件,具体的,从内核事件表删除事件,关闭文件描述符,释放连接资源。
1 2 3 4 5 6 7 8 9 10 11
|
void cb_func(client_data *user_data) { epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); http_conn::m_user_count--; }
|
6.3.4 定时器容器设置
项目中的定时器容器为带头尾结点的按过期时间升序排列的双向链表(越靠后的定时器,过期时间越大),具体的为每个连接创建一个定时器,将其添加到链表中,并按照超时时间升序排列。执行定时任务时,将到期的定时器从链表中删除。
从实现上看,主要涉及双向链表的插入,删除操作,其中添加定时器的事件复杂度是O(n),删除定时器的事件复杂度是O(1)。
升序双向链表主要逻辑如下,具体的,
创建头尾节点,其中头尾节点没有意义,仅仅统一方便调整
add_timer函数,将目标定时器添加到链表中,添加时按照升序添加
若当前链表中只有头尾节点,直接插入
adjust_timer函数,当定时任务发生变化,调整对应定时器在链表中的位置
客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间,这里只是往后延长超时时间
- 被调整的目标定时器在尾部,或定时器新的超时值仍然小于下一个定时器的超时,不用调整
- 否则先将定时器从链表取出,重新插入链表
del_timer函数将超时的定时器从链表中删除
常规双向链表删除结点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
class sort_timer_lst { public: sort_timer_lst(); ~sort_timer_lst(); void add_timer(util_timer *timer); void adjust_timer(util_timer *timer); void del_timer(util_timer *timer); void tick();
private: void add_timer(util_timer *timer, util_timer *lst_head);
util_timer *head; util_timer *tail; };
|
实现:lst_timer.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void sort_timer_lst::add_timer(util_timer *timer) { if (!timer) { return; } if (!head) { head = tail = timer; return; } if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| void sort_timer_lst::adjust_timer(util_timer *timer) { if (!timer) { return; } util_timer *tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| void sort_timer_lst::del_timer(util_timer *timer) { if (!timer) { return; } if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head) { util_timer *prev = lst_head; util_timer *tmp = prev->next; while (tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } }
|
6.3.5 定时任务处理函数
使用统一事件源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理函数,处理链表容器中到期的定时器。
具体的逻辑如下,
- 遍历定时器升序链表容器,从头结点开始依次处理每个定时器,直到遇到尚未到期的定时器
- 若当前时间小于定时器超时时间,跳出循环,即未找到到期的定时器
- 若当前时间大于定时器超时时间,即找到了到期的定时器,执行回调函数,然后将它从链表中删除,然后继续遍历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| void sort_timer_lst::tick() { if (!head) { return; } time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } }
|
6.3.6 定时器的使用
服务器首先创建定时器容器链表,然后用统一事件源将异常事件,读写事件和信号事件统一处理,根据不同事件的对应逻辑使用定时器。
具体的,
- 浏览器与服务器连接时,创建该连接对应的定时器,并将该定时器添加到链表上
- 处理异常事件时,执行定时事件,服务器关闭连接,从链表上移除对应定时器
- 处理定时信号时,将定时标志设置为true
- 处理读事件时,若某连接上发生读事件,将对应定时器向后移动,否则,执行定时事件
- 处理写事件时,若服务器通过某连接给浏览器发送数据,将对应定时器向后移动,否则,执行定时事件
代码在整体流程
中分析,详见7.2.1.⑥节
及以后
7 整体流程和补充
7.0 程序入口函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| #include "config.h"
int main(int argc, char *argv[]) { string user = "root"; string passwd = "root"; string databasename = "qgydb";
Config config; config.parse_arg(argc, argv); WebServer server;
server.init(config.PORT, user, passwd, databasename, config.LOGWrite, config.OPT_LINGER, config.TRIGMode, config.sql_num, config.thread_num, config.close_log, config.actor_model);
server.log_write();
server.sql_pool();
server.thread_pool();
server.trig_mode();
server.eventListen();
server.eventLoop(); return 0; }
|
7.1 配置类
对一些常量的设置,以及解析命令行参数。
定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #ifndef CONFIG_H #define CONFIG_H
#include "webserver.h"
using namespace std;
class Config { public: Config(); ~Config(){};
void parse_arg(int argc, char*argv[]);
int PORT;
int LOGWrite;
int TRIGMode;
int LISTENTrigmode;
int CONNTrigmode;
int OPT_LINGER;
int sql_num;
int thread_num;
int close_log;
int actor_model; };
#endif
|
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| #include "config.h"
Config::Config(){ PORT = 9006;
LOGWrite = 0;
TRIGMode = 0;
LISTENTrigmode = 0;
CONNTrigmode = 0;
OPT_LINGER = 0;
sql_num = 8;
thread_num = 8;
close_log = 0;
actor_model = 0; }
void Config::parse_arg(int argc, char*argv[]){ int opt; const char *str = "p:l:m:o:s:t:c:a:"; while ((opt = getopt(argc, argv, str)) != -1) { switch (opt) { case 'p': { PORT = atoi(optarg); break; } case 'l': { LOGWrite = atoi(optarg); break; } case 'm': { TRIGMode = atoi(optarg); break; } case 'o': { OPT_LINGER = atoi(optarg); break; } case 's': { sql_num = atoi(optarg); break; } case 't': { thread_num = atoi(optarg); break; } case 'c': { close_log = atoi(optarg); break; } case 'a': { actor_model = atoi(optarg); break; } default: break; } } }
|
7.2 服务器类
7.2.1 定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| #ifndef WEBSERVER_H #define WEBSERVER_H
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <fcntl.h> #include <stdlib.h> #include <cassert> #include <sys/epoll.h>
#include "./threadpool/threadpool.h" #include "./http/http_conn.h"
const int MAX_FD = 65536; const int MAX_EVENT_NUMBER = 10000; const int TIMESLOT = 5;
class WebServer { public: WebServer(); ~WebServer(); void init(int port , string user, string passWord, string databaseName, int log_write , int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model); void thread_pool(); void sql_pool(); void log_write(); void trig_mode(); void eventListen(); void eventLoop(); void timer(int connfd, struct sockaddr_in client_address); void adjust_timer(util_timer *timer); void deal_timer(util_timer *timer, int sockfd); bool dealclinetdata(); bool dealwithsignal(bool& timeout, bool& stop_server); void dealwithread(int sockfd); void dealwithwrite(int sockfd);
public: int m_port; char *m_root; int m_log_write; int m_close_log; int m_actormodel;
int m_pipefd[2]; int m_epollfd; http_conn *users;
connection_pool *m_connPool; string m_user; string m_passWord; string m_databaseName; int m_sql_num;
threadpool<http_conn> *m_pool; int m_thread_num;
epoll_event events[MAX_EVENT_NUMBER]; int m_listenfd; int m_OPT_LINGER; int m_TRIGMode; int m_LISTENTrigmode; int m_CONNTrigmode;
client_data *users_timer; Utils utils; }; #endif
|
7.2.1 主线程
① 构造和析构
对应程序入口函数(7.0节
)的WebServer server;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| WebServer::WebServer() { users = new http_conn[MAX_FD];
char server_path[200]; getcwd(server_path, 200); char root[6] = "/root"; m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1); strcpy(m_root, server_path); strcat(m_root, root);
users_timer = new client_data[MAX_FD]; }
WebServer::~WebServer() { close(m_epollfd); close(m_listenfd); close(m_pipefd[1]); close(m_pipefd[0]); delete[] users; delete[] users_timer; delete m_pool; }
void WebServer::init(int port, string user, string passWord, string databaseName, int log_write, int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model) { m_port = port; m_user = user; m_passWord = passWord; m_databaseName = databaseName; m_sql_num = sql_num; m_thread_num = thread_num; m_log_write = log_write; m_OPT_LINGER = opt_linger; m_TRIGMode = trigmode; m_close_log = close_log; m_actormodel = actor_model; }
|
② 线程池
1 2 3 4 5
| void WebServer::thread_pool() { m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num); }
|
其中调用了threadpool
类的构造函数:(详见4.3.2节
)
1 2
| threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
|
该函数会预先创建m_thread_num
个线程。
③ 触发模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
void WebServer::trig_mode() { if (0 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 0; } else if (1 == m_TRIGMode) { m_LISTENTrigmode = 0; m_CONNTrigmode = 1; } else if (2 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 0; } else if (3 == m_TRIGMode) { m_LISTENTrigmode = 1; m_CONNTrigmode = 1; } }
|
④ 事件监听
- 创建监听套接字,设置套接字选项(
SO_LINGER
),绑定和监听
- epoll设置
- 定时器设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| void WebServer::eventListen() { m_listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(m_listenfd >= 0);
if (0 == m_OPT_LINGER) { struct linger tmp = {0, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); } else if (1 == m_OPT_LINGER) { struct linger tmp = {1, 1}; setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp)); }
int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; address.sin_addr.s_addr = htonl(INADDR_ANY); address.sin_port = htons(m_port);
int flag = 1; setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret >= 0); ret = listen(m_listenfd, 5); assert(ret >= 0);
utils.init(TIMESLOT);
epoll_event events[MAX_EVENT_NUMBER]; m_epollfd = epoll_create(5); assert(m_epollfd != -1); utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode); http_conn::m_epollfd = m_epollfd;
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd); assert(ret != -1); utils.setnonblocking(m_pipefd[1]); utils.addfd(m_epollfd, m_pipefd[0], false, 0); utils.addsig(SIGPIPE, SIG_IGN); utils.addsig(SIGALRM, utils.sig_handler, false); utils.addsig(SIGTERM, utils.sig_handler, false);
alarm(TIMESLOT);
Utils::u_pipefd = m_pipefd; Utils::u_epollfd = m_epollfd; }
|
信号处理函数sig_handler
向u_piped[1]
写入信号值。
⑤ 事件循环
这是入口函数的倒数第二句:server.eventLoop()
,正常运行情况下是一个死循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| void WebServer::eventLoop() { bool timeout = false; bool stop_server = false; while (!stop_server) { int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); if (number < 0 && errno != EINTR) { LOG_ERROR("%s", "epoll failure"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd;
if (sockfd == m_listenfd) { bool flag = dealclinetdata(); if (false == flag) continue; } else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) { util_timer *timer = users_timer[sockfd].timer; deal_timer(timer, sockfd); } else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) { bool flag = dealwithsignal(timeout, stop_server); if (false == flag) LOG_ERROR("%s", "dealclientdata failure"); } else if (events[i].events & EPOLLIN) { dealwithread(sockfd); } else if (events[i].events & EPOLLOUT) { dealwithwrite(sockfd); } } if (timeout) { utils.timer_handler();
LOG_INFO("%s", "timer tick");
timeout = false; } } }
|
主线程在eventLoop
处死循环,阻塞在epoll_wait
处,等待一个或多个监听的套接字准备就绪,根据套接字和就绪类型,处理不同的事件。
超时代码解析:
timeout
标志位由dealwithsignal
修改。系统在④
设置了5s的alarm闹钟,5s后产生一个SIGALRM
信号,修改timeout为true;
- 进入if,调用
utils.timer_handler();
1 2 3 4 5
| void Utils::timer_handler() { m_timer_lst.tick(); alarm(m_TIMESLOT); }
|
tick
函数详见6.3.5节
,它将寻找过期的定时器并调用回调函数cb_func
,删除定时器和套接字。
各种事件处理的逻辑在下面列出。
⑥ 处理新到的连接
由函数dealclinetdata
完成,注:这里clinet
应该为client
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| bool WebServer::dealclinetdata() { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); if (0 == m_LISTENTrigmode) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); return false; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); return false; } timer(connfd, client_address); } else { while (1) { int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength); if (connfd < 0) { LOG_ERROR("%s:errno is:%d", "accept error", errno); break; } if (http_conn::m_user_count >= MAX_FD) { utils.show_error(connfd, "Internal server busy"); LOG_ERROR("%s", "Internal server busy"); break; } timer(connfd, client_address); } return false; } return true; }
|
分为水平触发和边沿触发,对于前者,每次eventLoop
的循环,在dealclinetdata
里处理一个连接;对于后者,在eventLoop
的一次循环中,就在dealclinetdata
处理完所有连接(使用while(1)
),当已完成连接队列为空后,accept立即返回-1(listenfd为非阻塞,在utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);
中设置),跳出while循环。
此外,主线程对每个连接进行了初始化和设置了定时器:
1
| timer(connfd, client_address);
|
WebServer::timer
函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void WebServer::timer(int connfd, struct sockaddr_in client_address) { users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);
users_timer[connfd].address = client_address; users_timer[connfd].sockfd = connfd;
util_timer *timer = new util_timer; timer->user_data = &users_timer[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; users_timer[connfd].timer = timer; utils.m_timer_lst.add_timer(timer); }
|
http_conn::init
函数如下:为副套接字注册读事件;主从状态机初始化,缓冲区下标初始化等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, int TRIGMode, int close_log, string user, string passwd, string sqlname) { m_sockfd = sockfd; m_address = addr; addfd(m_epollfd, sockfd, true, m_TRIGMode); m_user_count++;
doc_root = root; m_TRIGMode = TRIGMode; m_close_log = close_log;
strcpy(sql_user, user.c_str()); strcpy(sql_passwd, passwd.c_str()); strcpy(sql_name, sqlname.c_str()); init(); }
void http_conn::init() { mysql = NULL; bytes_to_send = 0; bytes_have_send = 0; m_check_state = CHECK_STATE_REQUESTLINE; m_linger = false; m_method = GET; m_url = 0; m_version = 0; m_content_length = 0; m_host = 0; m_start_line = 0; m_checked_idx = 0; m_read_idx = 0; m_write_idx = 0; cgi = 0; m_state = 0; timer_flag = 0; improv = 0; memset(m_read_buf, '\0', READ_BUFFER_SIZE); memset(m_write_buf, '\0', WRITE_BUFFER_SIZE); memset(m_real_file, '\0', FILENAME_LEN); }
|
⑦ 处理异常事件
由函数WebServer::deal_timer
完成,它将关闭定时器。
1 2 3 4 5 6 7 8 9 10
| void WebServer::deal_timer(util_timer *timer, int sockfd) { timer->cb_func(&users_timer[sockfd]); if (timer){ utils.m_timer_lst.del_timer(timer); } LOG_INFO("close fd %d", users_timer[sockfd].sockfd); }
|
回调函数cb_func
详见6.3.3节
。del_timer
详见6.3.4节
。
⑧ 处理定时器信号
在④
事件监听函数中,对SIGALRM
信号进行了处理函数的注册,并设置了定时alarm
:
1 2 3 4 5
| utils.addsig(SIGALRM, utils.sig_handler, false);
alarm(TIMESLOT);
|
信号处理函数sig_handler
:向u_piped[1]
写入信号值。
处理定时器信号由函数WebServer::dealwithsignal
函数完成,详见6.3.2节
。这里的接收到的信号只能是SIGALRM
或SIGTERM
。
⑨ 副套接字可读
由函数WebServer::dealwithread
完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
void WebServer::dealwithread(int sockfd) { util_timer *timer = users_timer[sockfd].timer;
if (1 == m_actormodel) { if (timer) { adjust_timer(timer); }
m_pool->append(users[sockfd], 0);
while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { if (users[sockfd].read_once()) { LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
m_pool->append_p(users[sockfd]); if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } } }
|
reactor
模型:主线程不读取已就绪的套接字,而是将连接加入到队列中(调用append
),由子线程读取并处理。然后主线程循环检测子线程是否读完数据(while(true)
),等待子线程读完之后(improv
被置为1),才能退出循环。
proactor
模型:主线程读取数据后,将该连接放入请求队列中(调用append_p
),然后调整定时器(adjust_timer
),如果没有数据传输,则删除定时器,关闭套接字。主线程不必等待子线程。
append_p
和append
函数详见4.3.3节
。
WebServer::adjust_timer
函数如下:
1 2 3 4 5 6 7 8 9 10
|
void WebServer::adjust_timer(util_timer *timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; utils.m_timer_lst.adjust_timer(timer);
LOG_INFO("%s", "adjust timer once"); }
|
其中又调用了工具类的adjust_timer
函数。
⑩ 副套接字可写
由函数WebServer::dealwithwrite
完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
void WebServer::dealwithwrite(int sockfd) { util_timer *timer = users_timer[sockfd].timer; if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users[sockfd], 1); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { if (users[sockfd].write()) { LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); if (timer) { adjust_timer(timer); } } else { deal_timer(timer, sockfd); } } }
|
⑪ 总结
这里再次分析一下reactor和proactor模型,在4.1
和4.2
节中已经分析过。
- reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
- proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。
区别在于处理副套接字可读和可写时的分支上,函数为dealwithread
,dealwithwrite
。
7.2.2 线程池中的线程
根据模型的不同,子线程完成的任务不同。
reactor
:需要先读取/写入数据,然后处理
proactor
:只需要处理
线程执行的函数为threadpool<T>::worker
,其中由调用了threadpool<T>::run
,详见4.3.4
和4.3.5
节。