轻量级Web服务器学习笔记

学习时间:2023年4月12日

项目地址:https://github.com/qinguoyi/TinyWebServer

1 框架

image-20230412175617219

2 源码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|---CGImysql 数据库
| |---sql_connection_pool.h
| |---sql_connection_pool.cpp
|---http http连接封装
| |---http_conn.cpp
| |---http_conn.h
|---lock 锁封装
| |---locker.h
|---log 日志
| |---block_queue.h
| |---log.cpp
| |---log.h
|---threadpool 线程池封装
| |---threadpool.h
|---timer 定时器和工具类封装
| |---lst_timer.cpp
| |---lst_timer.h
|---build.sh
|---config.cpp 配置类实现
|---config.h 配置类
|---main.cpp 入口
|---makefile
|---webserver.cpp 服务器类的实现
|---webserver.h 服务器类的定义

3 线程同步机制封装类

线程同步机制详见[[Linux系统编程学习笔记#8.5 线程同步]],可以使用信号量,互斥量和条件变量三种机制实现线程同步。

3.1 基础知识

3.1.1 RAII

  • RAII全称是Resource Acquisition is Initialization,直译过来是“资源获取即初始化”.
  • 在构造函数中申请分配资源,在析构函数中释放资源。因为C++的语言机制保证了,当一个对象创建的时候,自动调用构造函数,当对象超出作用域的时候会自动调用析构函数。所以,在RAII的指导下,我们应该使用类来管理资源,将资源和对象的生命周期绑定
  • RAII的核心思想是将资源或者状态与对象的生命周期绑定,通过C++的语言机制,实现资源和状态的安全管理,智能指针是RAII最好的例子

3.1.2 信号量

表示信号量的数据结构为sem_t,本质是一个长整数:

1
2
3
4
5
#include <semaphore.h>
typedef union {
char __size[__SIZEOF_SEM_T];
long int __align;
} sem_t;

相关系统调用:

1
2
3
// 创建
// 0, -1,下同
int sem_init(sem_t *sem, int pshared, unsigned int value);
  • pshared参数控制着信号量的类型。如果 pshared的值是0,就表示它是当前进程的局部信号量;否则,其它进程就能够共享这个信号量。一般设置为0;
  • value参数指定信号量的初始值
1
2
3
4
// 减少一个信号量
int sem_wait(sem_t * sem);
// 增加一个信号量
int sem_post(sem_t * sem);
1
2
// 销毁
int sem_destroy (sem_t *sem);

3.1.3 互斥量

互斥变量是用 pthread_mutex_t 数据类型表示的。详见[[Linux系统编程学习笔记#8.5.2 互斥量]]。

3.1.4 条件变量

条件变量由pthread_cond_t数据类型表示。详见[[Linux系统编程学习笔记#8.5.5 条件变量]]。

3.2 源码解析

源码:lock/locker.cpp,是对三种同步机制的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#ifndef LOCKER_H
#define LOCKER_H

#include <exception>
#include <pthread.h>
#include <semaphore.h>

// 信号量类
class sem {
public:
sem() {
if (sem_init(&m_sem, 0, 0) != 0) {
throw std::exception();
}
}
// 指定信号量的资源总数为num
sem(int num) {
if (sem_init(&m_sem, 0, num) != 0) {
throw std::exception();
}
}
~sem() {
sem_destroy(&m_sem);
}
bool wait() {
return sem_wait(&m_sem) == 0;
}
bool post() {
return sem_post(&m_sem) == 0;
}

private:
sem_t m_sem; // 信号量
};

// 互斥量
class locker {
public:
locker() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
}
~locker() {
pthread_mutex_destroy(&m_mutex);
}
bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get() {
return &m_mutex;
}

private:
pthread_mutex_t m_mutex;
};

// 条件变量
class cond {
public:
cond() {
if (pthread_cond_init(&m_cond, NULL) != 0) {
throw std::exception();
}
}
~cond() {
pthread_cond_destroy(&m_cond);
}
// 条件等待
bool wait(pthread_mutex_t *m_mutex) {
int ret = 0;
ret = pthread_cond_wait(&m_cond, m_mutex);
return ret == 0;
}
// 有时间的条件等待
bool timewait(pthread_mutex_t *m_mutex, struct timespec t) {
int ret = 0;
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
return ret == 0;
}
// 唤醒所有正在的至少一个线程
bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}
// 唤醒所有的阻塞线程
bool broadcast() {
return pthread_cond_broadcast(&m_cond) == 0;
}

private:
pthread_cond_t m_cond;
};
#endif

4 半同步反应堆线程池

4.1 事件处理模式

  • reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
  • proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。

4.2 同步I/O模拟proactor模式

由于异步I/O并不成熟,实际中使用较少,这里将使用同步I/O模拟实现proactor模式。

同步I/O模型的工作流程如下(epoll_wait为例):

  • 主线程往epoll内核事件表注册socket上的读就绪事件。
  • 主线程调用epoll_wait等待socket上有数据可读或可写,其中可读的条件满足下面之一即可:

    • 新的连接到来
    • 异常
    • 副套接字可读(处理客户连接上接收到的数据)
    • 定时器信号
  • 当socket上有数据可读,epoll_wait通知主线程,主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列。

  • 睡眠在请求队列上某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪事件
  • 主线程调用epoll_wait等待socket可写。
  • 当socket上有数据可写,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果。

4.3 源码解析

源码文件:threadpool.h类的定义和实现写在同一个文件里

线程池的设计模式为半同步/半反应堆,其中反应堆具体为Proactor事件处理模式。

具体的,主线程为异步线程,负责监听文件描述符,接收socket新连接,若当前监听的socket发生了读写事件,然后将任务插入到请求队列。工作线程从请求队列中取出任务,完成读写数据的处理。

4.3.1 线程池类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
template <typename T>
class threadpool {
public:
/*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
// 向请求队列中插入任务请求
// state: 0读1写
bool append(T *request, int state);
bool append_p(T *request);

private:
/*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/
// 作为pthread_create的参数start_routine的传入函数
static void *worker(void *arg);
// 运行线程池
void run();

private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中允许的最大请求数
pthread_t *m_threads; // 描述线程池的数组,其大小为m_thread_number
std::list<T *> m_workqueue; // 请求队列,双向链表存储
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
connection_pool *m_connPool; // 数据库
int m_actor_model; // 模型切换
};

4.3.2 线程池创建与销毁

构造函数中创建线程池,pthread_create函数中将类的对象作为参数传递给静态函数(worker),在静态函数中引用这个对象,并调用其动态方法(run)。

具体的,类对象传递时用this指针,传递给静态函数后,将其转换为线程池类,并调用私有成员函数run。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool *connPool,
int thread_number, int max_requests) : m_actor_model(actor_model),
m_thread_number(thread_number), m_max_requests(max_requests),
m_threads(NULL),m_connPool(connPool) { // 成员列表初始化

if(thread_number <= 0 || max_requests <= 0)
throw std::exception();
// 线程数组
m_threads = new pthread_t[m_thread_number];
if(!m_threads)
throw std::exception();
// 创建thread_number个线程
for(int i = 0; i < thread_number; ++i) {
// this(线程池类的对象)作为worker函数的void *arg参数
if (pthread_create(m_threads[i], NULL, worker, this) != 0) {
delete[] m_threads;
throw std::exception();
}
// 将线程分离,之后主线程不用收尸
if (pthread_detach(m_threads[i])) {
delete[] m_threads;
throw std::exception();
}
}
}

// 析构函数
template <typename T>
threadpool<T>::~threadpool() {
// 销毁线程数组的空间
delete[] m_threads;
}

4.3.3 向请求队列中添加任务

通过list容器创建请求队列,向队列中添加时,通过互斥锁保证线程安全,添加完成后通过信号量提醒有任务要处理,最后注意线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template <typename T>
bool threadpool<T>::append_p(T *request) {
// 对请求队列加锁
m_queuelocker.lock();
// 请求队列中的任务数量大于上限
if (m_workqueue.size() >= m_max_requests) {
// 解锁
m_queuelocker.unlock();
return false;
}
// 添加在末尾
m_workqueue.push_back(request);
// 解锁
m_queuelocker.unlock();
// 增加一个任务的信号量
m_queuestat.post();
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template <typename T>
bool threadpool<T>::append(T *request, int state) {
// 对请求队列加锁
m_queuelocker.lock();
if (m_workqueue.size() >= m_max_requests) {
m_queuelocker.unlock();
return false;
}
request->m_state = state;
// 加入请求链表
m_workqueue.push_back(request);
// 解锁
m_queuelocker.unlock();
// 增加一个信号量
m_queuestat.post();
return true;
}

4.3.4 线程池启动函数

内部访问私有成员函数run,完成线程处理要求。

1
2
3
4
5
6
7
8
9
// 线程执行函数
template <typename T>
void *threadpool<T>::worker(void *arg) {
// 这里的arg传入的就是threadpool类对象,因此可以强转
// pthread_create(m_threads + i, NULL, worker, this)
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}

4.3.5 线程池执行函数

主要实现,工作线程从请求队列中取出某个任务进行处理,注意线程同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
template <typename T>
void threadpool<T>::run() {
while (true) {
// 取出一个任务,没有任务将阻塞在这里
m_queuestat.wait();
// 请求队列加锁
m_queuelocker.lock();
if (m_workqueue.empty()) { // 请求队列中没有任务
m_queuelocker.unlock();
continue;
}
// 取出一个
T *request = m_workqueue.front();
m_workqueue.pop_front();
// 解锁
m_queuelocker.unlock();
if (!request)
continue;
// reator: 需要子线程读取套接字然后处理解析数据
if (1 == m_actor_model) {
if (0 == request->m_state) { // 读
if (request->read_once()) { // 读取数据
request->improv = 1; // 已经读取完数据,此时如果调度到主线程,主线程可以退出while(1)循环,下同
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process(); // 处理解析
}
else { // 没有数据可读(断开连接)
request->improv = 1; // 已经读取完数据
request->timer_flag = 1; // 删除定时器标志
}
}
else { // 写
if (request->write()) {
request->improv = 1;
}
else {
request->improv = 1;
request->timer_flag = 1;
}
}
}
// proactor: 主线程已经读取好了数据,子线程只需处理解析数据
else {
connectionRAII mysqlcon(&request->mysql, m_connPool);
// 解析
request->process();
}
}
}

5 HTTP连接处理

5.1 触发模式

  • LT水平触发模式

    • epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序可以不立即处理该事件。
    • 当下一次调用epoll_wait时,epoll_wait还会再次向应用程序报告此事件,直至被处理
  • ET边缘触发模式

    • epoll_wait检测到文件描述符有事件发生,则将其通知给应用程序,应用程序必须立即处理该事件
    • 必须要一次性将数据读取完,使用非阻塞I/O,读取到出现eagain

5.2 HTTP报文格式

HTTP报文分为请求报文和响应报文两种,每种报文必须按照特有格式生成,才能被浏览器端识别。

其中,浏览器端向服务器发送的为请求报文,服务器处理后返回给浏览器端的为响应报文。

5.2.1 请求报文

HTTP请求报文由请求行(request line)、请求头部(header)、空行和请求数据四个部分组成。

其中,请求分为两种,GET和POST,具体的:

  • GET
1
2
3
4
5
6
7
8
9
10
GET /562f25980001b1b106000338.jpg HTTP/1.1\r\n
Host:img.mukewang.com\r\n
User-Agent:Mozilla/5.0 (Windows NT 10.0; WOW64)\r\n
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.106 Safari/537.36\r\n
Accept:image/webp,image/*,*/*;q=0.8\r\n
Referer:http://www.imooc.com/\r\n
Accept-Encoding:gzip, deflate, sdch\r\n
Accept-Language:zh-CN,zh;q=0.8\r\n
空行\r\n
请求数据为空
  • POST
1
2
3
4
5
6
7
8
POST / HTTP1.1\r\n
Host:www.wrox.com\r\n
User-Agent:Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 2.0.50727; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022)\r\n
Content-Type:application/x-www-form-urlencoded\r\n
Content-Length:40\r\n
Connection: Keep-Alive\r\n
空行\r\n
name=Professional%20Ajax&publisher=Wiley\r\n

各字段含义

  • 请求行,用来说明请求类型,要访问的资源以及所使用的HTTP版本。
    GET说明请求类型为GET,/562f25980001b1b106000338.jpg(URL)为要访问的资源,该行的最后一部分说明使用的是HTTP1.1版本。

  • 请求头部,紧接着请求行(即第一行)之后的部分,用来说明服务器要使用的附加信息。

    • HOST,给出请求资源所在服务器的域名。
    • User-Agent,HTTP客户端程序的信息,该信息由你发出请求使用的浏览器来定义,并且在每个请求中自动发送等。
    • Accept,说明用户代理可处理的媒体类型。
    • Accept-Encoding,说明用户代理支持的内容编码。
    • Accept-Language,说明用户代理能够处理的自然语言集。
    • Content-Type,说明实现主体的媒体类型。
    • Content-Length,说明实现主体的大小。
    • Connection,连接管理,可以是Keep-Alive或close。
  • 空行,请求头部后面的空行是必须的即使第四部分的请求数据为空,也必须有空行。

  • 请求数据也叫主体,可以添加任意的其他数据。

5.2.2 响应报文

HTTP响应也由四个部分组成,分别是:状态行、消息报头、空行和响应正文。

1
2
3
4
5
6
7
8
9
10
HTTP/1.1 200 OK\r\n
Date: Fri, 22 May 2009 06:07:21 GMT\r\n
Content-Type: text/html; charset=UTF-8\r\n
空行\r\n
<html>\r\n
<head></head>\r\n
<body>\r\n
<!--body goes here-->\r\n
</body>\r\n
</html>\r\n

各字段含义

  • 状态行,由HTTP协议版本号, 状态码, 状态消息 三部分组成。
    第一行为状态行,(HTTP/1.1)表明HTTP版本为1.1版本,状态码为200,状态消息为OK。
  • 消息报头,用来说明客户端要使用的一些附加信息。
    第二行和第三行为消息报头,Date:生成响应的日期和时间;Content-Type:指定了MIME类型的HTML(text/html),编码类型是UTF-8。
  • 空行,消息报头后面的空行是必须的。
  • 响应正文,服务器返回给客户端的文本信息。空行后面的html部分为响应正文。

5.3 状态机

5.3.1 http报文处理流程

  • 浏览器端发出http连接请求,主线程创建http对象接收请求并将所有数据读入对应buffer,将该对象插入任务队列,工作线程从任务队列中取出一个任务进行处理。

  • 工作线程取出任务后,调用process_read函数,通过主、从状态机对请求报文进行解析。

  • 解析完之后,跳转do_request函数生成响应报文,通过process_write写入buffer,返回给浏览器端。

5.3.2 状态机和服务器解析请求报文

从状态机负责读取报文的一行,主状态机负责对该行数据进行解析,主状态机内部调用从状态机,从状态机驱动主状态机。

image-20230412231008658

主状态机

三种状态,标识解析位置。

  • CHECK_STATE_REQUESTLINE,解析请求行
  • CHECK_STATE_HEADER,解析请求头
  • CHECK_STATE_CONTENT,解析消息体,仅用于解析POST请求

主状态机运行逻辑:

主状态机初始状态是CHECK_STATE_REQUESTLINE,通过调用从状态机来驱动主状态机,在主状态机进行解析前,从状态机已经将每一行的末尾\r\n符号改为\0\0,以便于主状态机直接取出对应字符串进行处理。

  • CHECK_STATE_REQUESTLINE

    • 主状态机的初始状态,调用parse_request_line函数解析请求行
    • 解析函数从m_read_buf中解析HTTP请求行,获得请求方法、目标URL及HTTP版本号
    • 解析完成后主状态机的状态变为CHECK_STATE_HEADER
    • 解析完请求行后,主状态机继续分析请求头。在报文中,请求头和空行的处理使用的同一个函数,这里通过判断当前的text首位是不是\0字符,若是,则表示当前处理的是空行,若不是,则表示当前处理的是请求头。
  • CHECK_STATE_HEADER

    • 调用parse_headers函数解析请求头部信息
    • 判断是空行还是请求头,若是空行,进而判断content-length是否为0,如果不是0,表明是POST请求,则状态转移到CHECK_STATE_CONTENT,否则说明是GET请求,则报文解析结束。
    • 若解析的是请求头部字段,则主要分析connection字段,content-length字段,其他字段可以直接跳过,各位也可以根据需求继续分析。
    • connection字段判断是keep-alive还是close,决定是长连接还是短连接
    • content-length字段,这里用于读取post请求的消息体长度
    • 如果仅仅是GET请求
  • CHECK_STATE_CONTENT
    • 仅用于解析POST请求,调用parse_content函数解析消息体
    • 用于保存post请求消息体,为后面的登录和注册做准备

从状态机

从状态机有三种状态,标识解析一行的读取状态。

  • LINE_OK,完整读取一行
  • LINE_BAD,报文语法有误
  • LINE_OPEN,读取的行不完整

从状态机运行逻辑:

在HTTP报文中,每一行的数据由\r\n作为结束字符,空行则是仅仅是字符\r\n。因此,可以通过查找\r\n将报文拆解成单独的行进行解析,项目中便是利用了这一点。

从状态机负责读取buffer中的数据,将每行数据末尾的\r\n置为\0\0,并更新从状态机在buffer中读取的位置m_checked_idx,以此来驱动主状态机解析。

  • 从状态机从m_read_buf中逐字节读取,判断当前字节是否为\r
    • 接下来的字符是\n,将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK,表示读取了一行数据
      • 接下来达到了buffer末尾,表示buffer还需要继续接收,返回LINE_OPEN
      • 否则,表示语法错误,返回LINE_BAD
  • 当前字节不是\r,判断是否是\n(一般是上次读取到\r就到了buffer末尾,没有接收完整,再次接收时会出现这种情况

    • 如果前一个字符是\r,则将\r\n修改成\0\0,将m_checked_idx指向下一行的开头,则返回LINE_OK

    • 当前字节既不是\r,也不是\n

    • 表示接收不完整,需要继续接收,返回LINE_OPEN

HTTP状态码

5.3.3 状态机和服务器响应请求报文

浏览器端发出HTTP请求报文,服务器端接收该报文并调用process_read对其进行解析,根据解析结果HTTP_CODE,进入相应的逻辑和模块。

其中,服务器子线程完成报文的解析与响应;主线程监测读写事件,调用read_oncehttp_conn::write完成数据的读取与发送。

image-20230412233056029

HTTP_CODE含义

表示HTTP请求的处理结果,在头文件中初始化了八种情形,在报文解析与响应中只用到了七种。

  • NO_REQUEST

    • 请求不完整,需要继续读取请求报文数据
      • 跳转主线程继续监测读事件
  • GET_REQUEST

    • 获得了完整的HTTP请求
      • 调用do_request完成请求资源映射
  • NO_RESOURCE

    • 请求资源不存在
      • 跳转process_write完成响应报文
  • BAD_REQUEST

    • HTTP请求报文有语法错误或请求资源为目录
      • 跳转process_write完成响应报文
  • FORBIDDEN_REQUEST

    • 请求资源禁止访问,没有读取权限
      • 跳转process_write完成响应报文
  • FILE_REQUEST

    • 请求资源可以正常访问
      • 跳转process_write完成响应报文
  • INTERNAL_ERROR

    • 服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发

5.4 源码分析

5.4.1 http类定义

源码:http_conn.h

5.4.2 epoll相关代码

项目中epoll相关代码部分包括非阻塞模式、内核事件表注册事件、删除事件、重置EPOLLONESHOT事件四种。

  • 非阻塞模式
1
2
3
4
5
6
7
8
9
10
//对文件描述符设置非阻塞
int setnonblocking(int fd) {
// 获取原来的文件描述符的选项设置
int old_option = fcntl(fd, F_GETFL);
// 或上一个非阻塞模式
int new_option = old_option | O_NONBLOCK;
// 为fd设置新的选项
fcntl(fd, F_SETFL, new_option);
return old_option;
}
  • 内核事件表注册新事件,开启EPOLLONESHOT,针对客户端连接的描述符,listenfd不用开启
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//将内核事件表注册读事件,ET模式,选择开启EPOLLONESHOT
void addfd(int epollfd, int fd, bool one_shot, int TRIGMode) {
epoll_event event;
event.data.fd = fd;
// 边沿触发
if (1 == TRIGMode)
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
else // 水平触发
event.events = EPOLLIN | EPOLLRDHUP;

if (one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd); // 非阻塞
}
  • 内核事件表删除事件
1
2
3
4
5
//从内核事件表删除描述符
void removefd(int epollfd, int fd) {
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}
  • 重置EPOLLONESHOT事件
1
2
3
4
5
6
7
8
9
10
11
12
//将事件重置为EPOLLONESHOT
void modfd(int epollfd, int fd, int ev, int TRIGMode) {
epoll_event event;
event.data.fd = fd;

if (1 == TRIGMode)
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
else
event.events = ev | EPOLLONESHOT | EPOLLRDHUP;

epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

5.4.3 服务器接收http请求

该部分的实现源码位于webserver.cpp。详见7.2.1.④和⑤节

5.4.4 读取请求报文

read_once读取浏览器端发送来的请求报文,直到无数据可读或对方关闭连接,读取到m_read_buffer中,并更新m_read_idx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 循环读取客户数据,直到无数据可读或对方关闭连接
// 非阻塞ET工作模式下,需要一次性将数据读完
bool http_conn::read_once() {
if (m_read_idx >= READ_BUFFER_SIZE) {
return false;
}
// 读到的字节数
int bytes_read = 0;

//LT读取数据 水平触发
if (0 == m_TRIGMode) {
// 返回读取到的字节数bytes_read
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
// 更新读指针
m_read_idx += bytes_read;
if (bytes_read <= 0) {
return false;
}
return true;
}
//ET读数据 必须要一次性将数据读取完
else {
while (true) {
bytes_read = recv(m_sockfd, m_read_buf + m_read_idx, READ_BUFFER_SIZE - m_read_idx, 0);
if (bytes_read == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) // 数据已经读完
break;
return false;
} else if (bytes_read == 0) { // 断开了连接
return false;
}
m_read_idx += bytes_read;
}
return true;
}
}

5.4.5 http请求报文解析

各子线程通过process函数对任务进行处理,调用process_read函数和process_write函数分别完成报文解析与报文响应两个任务(驱动主从状态机)。

1

process_read代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
http_conn::HTTP_CODE http_conn::process_read() {
// 从状态机的初始状态
LINE_STATUS line_status = LINE_OK;
// 返回码
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
// 前者用于解析POST的循环,后者用于解析GET的循环
while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)) {
text = get_line(); // 获取缓冲区中新起一行的首地址
m_start_line = m_checked_idx; // 记录新起一行的首地址
LOG_INFO("%s", text);
// 注状态机的状态转移
switch (m_check_state){
// 解析行
case CHECK_STATE_REQUESTLINE: {
ret = parse_request_line(text);
if (ret == BAD_REQUEST)
return BAD_REQUEST;
break;
}
// 解析头
case CHECK_STATE_HEADER: {
ret = parse_headers(text);
if (ret == BAD_REQUEST)
return BAD_REQUEST;
else if (ret == GET_REQUEST) {
// 对于GET请求,没有请求体
// 因此解析完请求头后,就可以处理请求了
return do_request();
}
break;
}
// 解析体
case CHECK_STATE_CONTENT: {
ret = parse_content(text);
if (ret == GET_REQUEST)
// 完整解析POST请求后,跳转到报文响应函数
return do_request();
line_status = LINE_OPEN;
break;
}
default:
return INTERNAL_ERROR;
}
}
return NO_REQUEST;
}

解析一行的函数parse_line,用于驱动从状态机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 从状态机,用于分析出一行内容
// 返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN
http_conn::LINE_STATUS http_conn::parse_line() {
char temp;
for (; m_checked_idx < m_read_idx; ++m_checked_idx) {
temp = m_read_buf[m_checked_idx];
if (temp == '\r') { // 当前字符是\r
if ((m_checked_idx + 1) == m_read_idx) // \r是最后一个字符
return LINE_OPEN; // 读取的行不完整,需要继续读取
else if (m_read_buf[m_checked_idx + 1] == '\n') { // 下一个字符是\n
m_read_buf[m_checked_idx++] = '\0';
m_read_buf[m_checked_idx++] = '\0'; // 将\r\n替换为\0\0空字符
return LINE_OK; // 完整读取一行
}
return LINE_BAD; // 报文语法有误
} else if (temp == '\n') { // 当前字符是\n
if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r') { // 上一个字符是\r
m_read_buf[m_checked_idx - 1] = '\0';
m_read_buf[m_checked_idx++] = '\0'; // 将\r\n替换为\0\0空字符
return LINE_OK; // 完整读取一行
}
return LINE_BAD;
}
}
return LINE_OPEN; // 读取的行不完整,需要继续读取
}

解析请求行、头、体的函数分别为parse_request_lineparse_headersparse_content,用于驱动主状态机。

初始状态为解析请求行。请求行各个选项之间用空格或\t分隔。

相关库函数

1
char *strpbrk(const char *str1, const char *str2)
  • str1 — 要被检索的 C 字符串。
  • str2 — 该字符串包含了要在 str1 中进行匹配的字符列表。
  • 在源字符串(s1)中找出最先含有搜索字符串(s2)中任一字符的位置并返回,若找不到则返回空指针。
1
int strcasecmp(const char *s1, const char *s2);
  • 用来比较参数s1和s2字符串,比较时会自动忽略大小写的差异。
  • 若参数s1和s2字符串相等则返回0。s1大于s2则返回大于0 的值,s1 小于s2 则返回小于0的值。
1
size_t strspn(const char *str1, const char *str2)
  • str1 — 要被检索的 C 字符串。
  • str2 — 该字符串包含了要在 str1 中进行匹配的字符列表。
  • 该函数返回 str1 中第一个不在字符串 str2 中出现的字符下标。

解析请求行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 解析http请求行,获得请求方法,目标url及http版本号
http_conn::HTTP_CODE http_conn::parse_request_line(char *text) {
// 找空格或\t
m_url = strpbrk(text, " \t");
// 如果没有空格或\t,则报文格式有误
if (!m_url) {
return BAD_REQUEST;
}

*m_url++ = '\0'; // 先将该位置赋值,指针再移动
char *method = text; // 取出请求方法
if (strcasecmp(method, "GET") == 0)
m_method = GET;
else if (strcasecmp(method, "POST") == 0) {
m_method = POST;
cgi = 1;
}
else
return BAD_REQUEST;

m_url += strspn(m_url, " \t");
m_version = strpbrk(m_url, " \t");
if (!m_version)
return BAD_REQUEST;
*m_version++ = '\0';
m_version += strspn(m_version, " \t");
if (strcasecmp(m_version, "HTTP/1.1") != 0)
return BAD_REQUEST;
if (strncasecmp(m_url, "http://", 7) == 0) {
m_url += 7;
m_url = strchr(m_url, '/');
}

if (strncasecmp(m_url, "https://", 8) == 0) {
m_url += 8;
m_url = strchr(m_url, '/');
}

if (!m_url || m_url[0] != '/')
return BAD_REQUEST;
//当url为/时,显示判断界面
if (strlen(m_url) == 1)
strcat(m_url, "judge.html");
m_check_state = CHECK_STATE_HEADER; // 更新主状态机
return NO_REQUEST; // 请求不完整,需要继续读取请求报文数据
}

解析请求头

解析请求体

1
2
3
4
5
6
7
8
9
10
//判断http请求是否被完整读入
http_conn::HTTP_CODE http_conn::parse_content(char *text) {
if (m_read_idx >= (m_content_length + m_checked_idx)) {
text[m_content_length] = '\0';
//POST请求中最后为输入的用户名和密码
m_string = text;
return GET_REQUEST;
}
return NO_REQUEST;
}

5.4.6 响应http请求报文

浏览器端发出HTTP请求报文,服务器端接收该报文并调用process_read对其进行解析,根据解析结果HTTP_CODE,进入相应的逻辑和模块。

预备:相关API

mmap:用于将一个文件或其他对象映射到内存,提高文件的访问速度。

1
2
void* mmap(void* start,size_t length,int prot,int flags,int fd,off_t offset);
int munmap(void* start,size_t length);
  • start:映射区的开始地址,设置为0时表示由系统决定映射区的起始地址

  • length:映射区的长度

  • prot:期望的内存保护标志,不能与文件的打开模式冲突

    • PROT_READ 表示页内容可以被读取
  • flags:指定映射对象的类型,映射选项和映射页是否可以共享

    • MAP_PRIVATE 建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件
  • fd:有效的文件描述符,一般是由open()函数返回

  • off_toffset:被映射对象内容的起点

iovec:定义了一个向量元素,通常,这个结构用作一个多元素的数组。

1
2
3
4
struct iovec {
void *iov_base; /* starting address of buffer */
size_t iov_len; /* size of buffer */
};
  • iov_base指向数据的地址
  • iov_len表示数据的长度

writev:用于在一次函数调用中写多个非连续缓冲区,有时也将这该函数称为聚集写。

1
2
#include <sys/uio.h>
ssize_t writev(int filedes, const struct iovec *iov, int iovcnt);
  • filedes表示文件描述符
  • iov为前述io向量机制结构体iovec
  • iovcnt为结构体的个数

若成功则返回已写的字节数,若出错则返回-1。writev以顺序iov[0]iov[1]iov[iovcnt-1]从缓冲区中聚集输出数据。writev返回输出的字节总数,通常,它应等于所有缓冲区长度之和。

do_request函数

process_read函数的返回值是对请求的文件分析后的结果,一部分是语法错误导致的BAD_REQUEST,一部分是do_request的返回结果,该函数将网站根目录和url文件拼接,然后通过stat判断该文件属性。另外,为了提高访问速度,通过mmap进行映射,将普通文件映射到内存逻辑地址。

process_write函数

根据do_request的返回状态,服务器子线程调用向m_write_buf中写入响应报文。

  • add_status_line函数,添加状态行:http/1.1 状态码 状态消息

  • add_headers函数添加消息报头,内部调用add_content_length和add_linger函数

  • content-length记录响应报文长度,用于浏览器端判断服务器是否发送完数据

    • connection记录连接状态,用于告诉浏览器端保持长连接
  • add_blank_line添加空行

上述涉及的5个函数,均是内部调用add_response函数更新m_write_idx指针和缓冲区m_write_buf中的内容。

6 定时器处理非活动连接

6.1 概述

非活跃,是指客户端(这里是浏览器)与服务器端建立连接后,长时间不交换数据,一直占用服务器端的文件描述符,导致连接资源的浪费。

定时事件,是指固定一段时间之后触发某段代码,由该段代码处理一个事件,如从内核事件表删除事件,并关闭文件描述符,释放连接资源。

定时器,是指利用结构体或其他形式,将多种定时事件进行封装起来。具体的,这里只涉及一种定时事件,即定期检测非活跃连接,这里将该定时事件与连接资源封装为一个结构体定时器。

定时器容器,是指使用某种容器类数据结构,将上述多个定时器组合起来,便于对定时事件统一管理。具体的,项目中使用升序链表将所有定时器串联组织起来。

本项目中,服务器主循环为每一个连接创建一个定时器,并对每个连接进行定时。另外,利用升序时间链表容器将所有定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务。

Linux下提供了三种定时的方法:

  • socket选项SO_RECVTIMEO和SO_SNDTIMEO
  • SIGALRM信号
  • I/O复用系统调用的超时参数

具体的,利用alarm函数周期性地触发SIGALRM信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。

6.2 统一事件源

统一事件源,是指将信号事件与其他事件一样被处理。

具体的,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理。

6.3 源码分析

6.3.1 信号处理函数

源码位于lst_timer.cpp

1
2
3
4
5
6
7
8
9
10
11
//信号处理函数
void Utils::sig_handler(int sig) {
//为保证函数的可重入性,保留原来的errno
//可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
int save_errno = errno;
int msg = sig;
// 将信号值从管道写端写入,传输字符类型,而非整型
send(u_pipefd[1], (char *)&msg, 1, 0);
// 恢复errno
errno = save_errno;
}

信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响。

1
2
3
4
5
6
7
8
9
10
11
12
//设置信号函数
void Utils::addsig(int sig, void(handler)(int), bool restart) {
struct sigaction sa;
// 初始化
memset(&sa, '\0', sizeof(sa));
// 信号处理函数中仅仅发送信号值,不做对应逻辑处理
sa.sa_handler = handler;
if (restart) // 如果需要重启
sa.sa_flags |= SA_RESTART; // 被信号打断后,重启系统调用
sigfillset(&sa.sa_mask); // 取消所有信号阻塞
assert(sigaction(sig, &sa, NULL) != -1); // 注册信号处理函数
}

6.3.2 信号通知逻辑

  • 创建管道,其中管道写端写入信号值,管道读端通过I/O复用系统监测读事件

  • 设置信号处理函数SIGALRM(时间到了触发)和SIGTERM(kill会触发,Ctrl+C)

  • 通过struct sigaction结构体和sigaction函数注册信号捕捉函数

    • 在结构体的handler参数设置信号处理函数,具体的,从管道写端写入信号的名字
  • 利用I/O复用系统监听管道读端文件描述符的可读事件

  • 信息值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 处理信号
bool WebServer::dealwithsignal(bool &timeout, bool &stop_server) {
int ret = 0;
int sig;
char signals[1024];
// 从管道读端读出信号值,成功返回字节数,失败返回-1
// 正常情况下,这里的ret返回值总是1,只有14和15两个ASCII码对应的字符
ret = recv(m_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
return false;
}
else if (ret == 0) {
return false;
}
else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGALRM: {
timeout = true;
break;
}
case SIGTERM: {
stop_server = true;
break;
}
}
}
}
return true;
}

为什么管道写端要非阻塞?

send是将信息发送给套接字缓冲区,如果缓冲区满了,则会阻塞,这时候会进一步增加信号处理函数的执行时间,为此,将其修改为非阻塞。

没有对非阻塞返回值处理,如果阻塞是不是意味着这一次定时事件失效了?

是的,但定时事件是非必须立即处理的事件,可以允许这样的情况发生。

管道传递的是什么类型?switch-case的变量冲突?

信号本身是整型数值,管道中传递的是ASCII码表中整型数值对应的字符。

switch的变量一般为字符或整型,当switch的变量为字符时,case中可以是字符,也可以是字符对应的ASCII码。

6.3.3 定时器设计

项目中将连接资源、定时事件和超时时间封装为定时器类,具体的,

  • 连接资源包括客户端套接字地址、文件描述符和定时器
  • 定时事件为回调函数,将其封装起来由用户自定义,这里是删除非活动socket上的注册事件,并关闭
  • 定时器超时时间 = 浏览器和服务器连接时刻 + 固定时间(TIMESLOT),可以看出,定时器使用绝对时间作为超时值,这里alarm设置为5秒,连接超时为15秒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// lst_timer.h

// 连接资源结构体成员需要用到定时器类
// 需要前向声明
class util_timer;

// 连接资源
struct client_data {
// 客户端地址
sockaddr_in address;
// 副套接字
int sockfd;
// 定时器
util_timer *timer;
};

// 定时器类
class util_timer {
public:
util_timer() : prev(NULL), next(NULL) {}

public:
// 超时时间
time_t expire;
// 回调函数
void (* cb_func)(client_data *);
// 连接资源
client_data *user_data;
// 上一个定时器
util_timer *prev;
// 下一个定时器
util_timer *next;
};

定时事件,具体的,从内核事件表删除事件,关闭文件描述符,释放连接资源。

1
2
3
4
5
6
7
8
9
10
11
// lst_timer.cpp
// 定时器回调函数
void cb_func(client_data *user_data) {
// 删除非活动连接在socket上的注册事件
epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
// 关闭套接字
close(user_data->sockfd);
// 减少连接数
http_conn::m_user_count--;
}

6.3.4 定时器容器设置

项目中的定时器容器为带头尾结点的按过期时间升序排列的双向链表(越靠后的定时器,过期时间越大),具体的为每个连接创建一个定时器,将其添加到链表中,并按照超时时间升序排列。执行定时任务时,将到期的定时器从链表中删除。

从实现上看,主要涉及双向链表的插入,删除操作,其中添加定时器的事件复杂度是O(n),删除定时器的事件复杂度是O(1)。

升序双向链表主要逻辑如下,具体的,

  • 创建头尾节点,其中头尾节点没有意义,仅仅统一方便调整

  • add_timer函数,将目标定时器添加到链表中,添加时按照升序添加

  • 若当前链表中只有头尾节点,直接插入

    • 否则,将定时器按升序插入
  • adjust_timer函数,当定时任务发生变化,调整对应定时器在链表中的位置

  • 客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间,这里只是往后延长超时时间

    • 被调整的目标定时器在尾部,或定时器新的超时值仍然小于下一个定时器的超时,不用调整
    • 否则先将定时器从链表取出,重新插入链表
  • del_timer函数将超时的定时器从链表中删除

  • 常规双向链表删除结点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// lst_timer.h
// 定时器容器类
class sort_timer_lst {
public:
sort_timer_lst();
~sort_timer_lst();

void add_timer(util_timer *timer);
void adjust_timer(util_timer *timer);
void del_timer(util_timer *timer);
// 定时任务处理函数
void tick();

private:
// add_timer重载
void add_timer(util_timer *timer, util_timer *lst_head);

util_timer *head;
util_timer *tail;
};

实现:lst_timer.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 添加定时器
void sort_timer_lst::add_timer(util_timer *timer) {
// timer为空
if (!timer) {
return;
}
// 头结点不存在
if (!head) {
head = tail = timer; // 头尾节点均为timer
return;
}
// 如果新的定时器超时时间小于当前头部结点
// 直接将当前定时器结点作为头部结点
if (timer->expire < head->expire) {
timer->next = head;
head->prev = timer;
head = timer;
return;
}
// 否则调用私有成员,调整内部结点
add_timer(timer, head);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 调整定时器,任务发生变化时,调整定时器timer在链表中的位置
void sort_timer_lst::adjust_timer(util_timer *timer) {
if (!timer) {
return;
}
util_timer *tmp = timer->next;
// 被调整的定时器在链表尾部,不调整
// 定时器超时值仍然小于下一个定时器超时值,不调整
if (!tmp || (timer->expire < tmp->expire)) {
return;
}
// 被调整定时器是链表头结点,将定时器取出,重新插入
if (timer == head) {
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
}
// 被调整定时器在内部,将定时器取出,重新插入
else {
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 删除定时器
void sort_timer_lst::del_timer(util_timer *timer) {
if (!timer) {
return;
}
// 只有timer一个节点
if ((timer == head) && (timer == tail)) {
delete timer;
head = NULL;
tail = NULL;
return;
}
// timer是头结点
if (timer == head) {
head = head->next;
head->prev = NULL;
delete timer;
return;
}
// timer是尾结点
if (timer == tail) {
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
// 中间节点
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 添加定时器
* 私有成员,被公有成员add_timer和adjust_time调用
* 主要用于调整链表内部结点
* timer: 要添加的定时器
* lst_head: timer下一个定时器
*/
void sort_timer_lst::add_timer(util_timer *timer, util_timer *lst_head) {
util_timer *prev = lst_head;
util_timer *tmp = prev->next;
while (tmp) {
// 遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作
if (timer->expire < tmp->expire) {
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
// 遍历完发现,目标定时器需要放到尾结点处
if (!tmp) {
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}

6.3.5 定时任务处理函数

使用统一事件源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理函数,处理链表容器中到期的定时器。

具体的逻辑如下,

  • 遍历定时器升序链表容器,从头结点开始依次处理每个定时器,直到遇到尚未到期的定时器
  • 若当前时间小于定时器超时时间,跳出循环,即未找到到期的定时器
  • 若当前时间大于定时器超时时间,即找到了到期的定时器,执行回调函数,然后将它从链表中删除,然后继续遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 定时器任务处理函数
void sort_timer_lst::tick() {
if (!head) {
return;
}
// 当前时间
time_t cur = time(NULL);
util_timer *tmp = head;
while (tmp) {
// 找到未到期的定时器,结束循环
if (cur < tmp->expire) {
break;
}
// 找到过期定时器,执行回调函数
tmp->cb_func(tmp->user_data);
// 更新头结点
head = tmp->next;
if (head) {
head->prev = NULL;
}
// 删除过期定时器
delete tmp;
tmp = head;
}
}

6.3.6 定时器的使用

服务器首先创建定时器容器链表,然后用统一事件源将异常事件,读写事件和信号事件统一处理,根据不同事件的对应逻辑使用定时器。

具体的,

  • 浏览器与服务器连接时,创建该连接对应的定时器,并将该定时器添加到链表上
  • 处理异常事件时,执行定时事件,服务器关闭连接,从链表上移除对应定时器
  • 处理定时信号时,将定时标志设置为true
  • 处理读事件时,若某连接上发生读事件,将对应定时器向后移动,否则,执行定时事件
  • 处理写事件时,若服务器通过某连接给浏览器发送数据,将对应定时器向后移动,否则,执行定时事件

代码在整体流程中分析,详见7.2.1.⑥节及以后

7 整体流程和补充

7.0 程序入口函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include "config.h"

int main(int argc, char *argv[]) {
//需要修改的数据库信息,登录名,密码,库名
string user = "root";
string passwd = "root";
string databasename = "qgydb";

//命令行解析
Config config;
config.parse_arg(argc, argv);

// 先调用默认构造函数
WebServer server;

// 初始化server
server.init(config.PORT, user, passwd, databasename, config.LOGWrite,
config.OPT_LINGER, config.TRIGMode, config.sql_num, config.thread_num,
config.close_log, config.actor_model);


// 日志
server.log_write();

// 数据库
server.sql_pool();

// 线程池
server.thread_pool();

// 触发模式
server.trig_mode();

// 监听
server.eventListen();

// 运行,死循环
server.eventLoop();

// nerver be reached in general...
return 0;
}

7.1 配置类

对一些常量的设置,以及解析命令行参数。

定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#ifndef CONFIG_H
#define CONFIG_H

#include "webserver.h"

using namespace std;

class Config {
public:
Config();
~Config(){};

// 解析命令行参数
void parse_arg(int argc, char*argv[]);

//端口号
int PORT;

//日志写入方式
int LOGWrite;

//触发组合模式
int TRIGMode;

// listenfd触发模式
int LISTENTrigmode;

// connfd触发模式
int CONNTrigmode;

// 优雅关闭链接
int OPT_LINGER;

// 数据库连接池数量
int sql_num;

// 线程池内的线程数量
int thread_num;

// 是否关闭日志
int close_log;

// 并发模型选择
int actor_model;
};

#endif

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include "config.h"

// 默认构造函数实现
Config::Config(){
// 端口号,默认9006
PORT = 9006;

// 日志写入方式,默认同步
LOGWrite = 0;

// 触发组合模式,默认listenfd LT + connfd LT
TRIGMode = 0;

// listenfd触发模式,默认LT
LISTENTrigmode = 0;

// connfd触发模式,默认LT
CONNTrigmode = 0;

// 优雅关闭链接,默认不使用
OPT_LINGER = 0;

// 数据库连接池数量,默认8
sql_num = 8;

// 线程池内的线程数量,默认8
thread_num = 8;

// 关闭日志,默认不关闭
close_log = 0;

// 并发模型,默认是proactor
actor_model = 0;
}

void Config::parse_arg(int argc, char*argv[]){
int opt;
// 单字符加冒号, 例如p:, 表示选项p有且必须加参数,例如-p 6379
const char *str = "p:l:m:o:s:t:c:a:";
// 如果选项成功找到,返回选项字母;如果所有命令行选项都解析完毕,返回-1;
while ((opt = getopt(argc, argv, str)) != -1) {
switch (opt)
{
case 'p':
{
PORT = atoi(optarg);
break;
}
case 'l':
{
LOGWrite = atoi(optarg);
break;
}
case 'm':
{
TRIGMode = atoi(optarg);
break;
}
case 'o':
{
OPT_LINGER = atoi(optarg);
break;
}
case 's':
{
sql_num = atoi(optarg);
break;
}
case 't':
{
thread_num = atoi(optarg);
break;
}
case 'c':
{
close_log = atoi(optarg);
break;
}
case 'a':
{
actor_model = atoi(optarg);
break;
}
default:
break;
}
}
}

7.2 服务器类

7.2.1 定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#ifndef WEBSERVER_H
#define WEBSERVER_H

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>

#include "./threadpool/threadpool.h"
#include "./http/http_conn.h"

const int MAX_FD = 65536; //最大文件描述符
const int MAX_EVENT_NUMBER = 10000; //最大事件数
const int TIMESLOT = 5; //最小超时单位

class WebServer
{
public:
WebServer();
~WebServer();
// 初始化服务器
void init(int port , string user, string passWord, string databaseName,
int log_write , int opt_linger, int trigmode, int sql_num,
int thread_num, int close_log, int actor_model);
// 创建线程池
void thread_pool();
void sql_pool();
void log_write();
// 设置触发模式
void trig_mode();
// 监听
void eventListen();
// 事件循环
void eventLoop();
// 创建定时器
void timer(int connfd, struct sockaddr_in client_address);
// 调整定时器
void adjust_timer(util_timer *timer);
// 关闭定时器
void deal_timer(util_timer *timer, int sockfd);
// 处理用户数据
bool dealclinetdata();
// 处理信号
bool dealwithsignal(bool& timeout, bool& stop_server);
// 处理线程
void dealwithread(int sockfd);
// 处理写套接字
void dealwithwrite(int sockfd);

public:
//基础
int m_port;
// 服务器的根目录
char *m_root;
int m_log_write;
int m_close_log;
// 模式 0为proactor,1为reactor
int m_actormodel;

// 管道套接字
int m_pipefd[2];
// epoll对象
int m_epollfd;
// http连接
http_conn *users;

// 数据库相关
connection_pool *m_connPool;
string m_user; //登陆数据库用户名
string m_passWord; //登陆数据库密码
string m_databaseName; //使用数据库名
int m_sql_num;

// 线程池
threadpool<http_conn> *m_pool;
// 线程池的线程数量
int m_thread_num;

// epoll_event相关
// 监听事件
epoll_event events[MAX_EVENT_NUMBER];
// 监听套接字
int m_listenfd;
// 套接字选项SO_LINGER
int m_OPT_LINGER;
// epoll触发模式
int m_TRIGMode;
// 监听套接字的触发模式
int m_LISTENTrigmode;
// 副套接字的触发模式
int m_CONNTrigmode;

// 连接资源
client_data *users_timer;
// 工具类
Utils utils;
};
#endif

7.2.1 主线程

① 构造和析构

对应程序入口函数(7.0节)的WebServer server;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 默认构造器
WebServer::WebServer() {
// 创建http_conn类对象的数组,代表http连接
users = new http_conn[MAX_FD]; // 65536

// root文件夹路径
char server_path[200];
getcwd(server_path, 200);
char root[6] = "/root";
m_root = (char *)malloc(strlen(server_path) + strlen(root) + 1);
strcpy(m_root, server_path);
strcat(m_root, root);

// 连接资源,一个资源对应一个定时器
users_timer = new client_data[MAX_FD];
}

// 析构函数
WebServer::~WebServer() {
close(m_epollfd); // 关闭epoll对象
close(m_listenfd); // 关闭监听套接字
close(m_pipefd[1]);
close(m_pipefd[0]); // 关闭unix套接字
delete[] users; // 释放http连接数组的空间
delete[] users_timer; // 释放定时器数组的空间
delete m_pool; // 释放线程池空间
}

// 通常在在构造后进行初始化
void WebServer::init(int port, string user, string passWord, string databaseName, int log_write,
int opt_linger, int trigmode, int sql_num, int thread_num, int close_log, int actor_model) {
m_port = port;
m_user = user;
m_passWord = passWord;
m_databaseName = databaseName;
m_sql_num = sql_num;
m_thread_num = thread_num;
m_log_write = log_write;
m_OPT_LINGER = opt_linger;
m_TRIGMode = trigmode;
m_close_log = close_log;
m_actormodel = actor_model;
}
② 线程池
1
2
3
4
5
// 创建线程池
void WebServer::thread_pool() {
//线程池
m_pool = new threadpool<http_conn>(m_actormodel, m_connPool, m_thread_num);
}

其中调用了threadpool类的构造函数:(详见4.3.2节

1
2
/*actor_model是工作模式,thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);

该函数会预先创建m_thread_num个线程。

③ 触发模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 设置触发模式
// 针对监听套接字和副套接字有4中可能的触发模式
void WebServer::trig_mode() {
//LT + LT
if (0 == m_TRIGMode) {
m_LISTENTrigmode = 0;
m_CONNTrigmode = 0;
}
//LT + ET
else if (1 == m_TRIGMode) {
m_LISTENTrigmode = 0;
m_CONNTrigmode = 1;
}
//ET + LT
else if (2 == m_TRIGMode) {
m_LISTENTrigmode = 1;
m_CONNTrigmode = 0;
}
//ET + ET
else if (3 == m_TRIGMode) {
m_LISTENTrigmode = 1;
m_CONNTrigmode = 1;
}
}
④ 事件监听
  • 创建监听套接字,设置套接字选项(SO_LINGER),绑定和监听
  • epoll设置
  • 定时器设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 监听
void WebServer::eventListen() {
// 获取套接字,对于POSIX是PF,对于BSD是AF
// 流式套接字,默认采用TCP协议
m_listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(m_listenfd >= 0);

// 优雅关闭连接
if (0 == m_OPT_LINGER) {
// 默认行为,应用层调用close后会立即返回,内核会将缓存数据发完,并正常挥手。
struct linger tmp = {0, 1};
// SO_LINGER等待套接字发送缓冲区中的数据发送完成
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}
else if (1 == m_OPT_LINGER) {
// 实际上设置了一个超时时间l_linger,单位为秒。
// 如果在超时时间内,缓冲区的数据发送完成,则正常挥手;
// 如果超时还没发送完,就如2一样直接关闭,并发送RST。
struct linger tmp = {1, 1};
setsockopt(m_listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof(tmp));
}

int ret = 0;
// 本机地址
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY);
address.sin_port = htons(m_port);

int flag = 1;
// 设置套接字选项
// SO_REUSEADDR是让端口释放后立即就可以被再次使用,参考Unix网络编程,这是常用技巧
setsockopt(m_listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
// 绑定
ret = bind(m_listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret >= 0);
// 监听,两个队列长度之和最大值为5
ret = listen(m_listenfd, 5);
assert(ret >= 0);

// 设置定时时间为5s
utils.init(TIMESLOT);

// epoll创建内核事件表
epoll_event events[MAX_EVENT_NUMBER];
// 创建epoll实例对象,这个对象描述符只存在一个
m_epollfd = epoll_create(5);
assert(m_epollfd != -1);
// 为m_listenfd(监听套接字)注册读事件,ET模式,非阻塞
utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);
// 设置http_conn类中的epoll对象描述符
// 每个连接都需要这个epoll对象来注册读写事件
http_conn::m_epollfd = m_epollfd;

// 创建unix域套接字,用于进程通信
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_pipefd);
assert(ret != -1);
// 设置管道写端为非阻塞,为什么写端要非阻塞?
utils.setnonblocking(m_pipefd[1]);
// 为0号管道注册读事件
utils.addfd(m_epollfd, m_pipefd[0], false, 0);
// 注册信号
utils.addsig(SIGPIPE, SIG_IGN);
utils.addsig(SIGALRM, utils.sig_handler, false);
utils.addsig(SIGTERM, utils.sig_handler, false);

// 每隔TIMESLOT时间触发SIGALRM信号
// 作用:每隔TIMESLOT时间检查非活动连接并删除
alarm(TIMESLOT); // 5s

// 工具类, 信号和描述符基础操作
Utils::u_pipefd = m_pipefd; // 两个是一样的
Utils::u_epollfd = m_epollfd;
}

信号处理函数sig_handleru_piped[1]写入信号值。

⑤ 事件循环

这是入口函数的倒数第二句:server.eventLoop(),正常运行情况下是一个死循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void WebServer::eventLoop() {
bool timeout = false;
bool stop_server = false;
// 死循环
while (!stop_server) {
// 阻塞监听
int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR) {
LOG_ERROR("%s", "epoll failure");
break;
}
// 遍历就绪事件
for (int i = 0; i < number; i++) {
// 获取准备就绪的套接字描述符
int sockfd = events[i].data.fd;

// 情况1:处理新到的客户连接
if (sockfd == m_listenfd) {
bool flag = dealclinetdata();
if (false == flag)
continue;
}
// 情况2:处理异常事件
else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 服务器端关闭sockfd对应的连接,移除对应的定时器
util_timer *timer = users_timer[sockfd].timer;
deal_timer(timer, sockfd);
}
// 情况3:处理定时器信号(m_pipefd[0]套接字可读)
else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) {
bool flag = dealwithsignal(timeout, stop_server);
if (false == flag)
LOG_ERROR("%s", "dealclientdata failure");
}
// 情况4:副套接字可读,处理客户连接上接收到的数据
else if (events[i].events & EPOLLIN) {
dealwithread(sockfd);
}
// 情况5:副套接字可写,处理向客户发送的数据
else if (events[i].events & EPOLLOUT) {
dealwithwrite(sockfd);
}
}
// 超时
if (timeout) {
// 重新计时
utils.timer_handler();

LOG_INFO("%s", "timer tick");

timeout = false;
}
}
}

主线程在eventLoop处死循环,阻塞在epoll_wait处,等待一个或多个监听的套接字准备就绪,根据套接字和就绪类型,处理不同的事件。

超时代码解析:

  • timeout标志位由dealwithsignal修改。系统在设置了5s的alarm闹钟,5s后产生一个SIGALRM信号,修改timeout为true;
  • 进入if,调用utils.timer_handler();
1
2
3
4
5
// 定时处理任务,重新定时以不断触发SIGALRM信号
void Utils::timer_handler() {
m_timer_lst.tick(); // 定时器任务处理函数
alarm(m_TIMESLOT); // 重新定时5s
}

tick函数详见6.3.5节,它将寻找过期的定时器并调用回调函数cb_func,删除定时器和套接字。

各种事件处理的逻辑在下面列出。

⑥ 处理新到的连接

由函数dealclinetdata完成,注:这里clinet应该为client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 处理新到的连接
bool WebServer::dealclinetdata() {
// 对端地址
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
// 监听套接字为LT模式
if (0 == m_LISTENTrigmode) {
// accept可以立即返回
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0) {
LOG_ERROR("%s:errno is:%d", "accept error", errno);
return false;
}
// 已连接数大于最大文件描述符,报错
if (http_conn::m_user_count >= MAX_FD) {
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
return false;
}
// 设置定时器
timer(connfd, client_address);
}
// 监听套接字为ET模式,一次性处理所有的连接
else {
while (1) {
// accept可以立即返回
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0) { // 当已完成连接队列为空后,accept立即返回-1,跳出循环,返回false
LOG_ERROR("%s:errno is:%d", "accept error", errno);
break;
}
if (http_conn::m_user_count >= MAX_FD) {
utils.show_error(connfd, "Internal server busy");
LOG_ERROR("%s", "Internal server busy");
break;
}
// 设置定时器
timer(connfd, client_address);
}
return false;
}
return true;
}

分为水平触发和边沿触发,对于前者,每次eventLoop的循环,在dealclinetdata里处理一个连接;对于后者,在eventLoop的一次循环中,就在dealclinetdata处理完所有连接(使用while(1)),当已完成连接队列为空后,accept立即返回-1(listenfd为非阻塞,在utils.addfd(m_epollfd, m_listenfd, false, m_LISTENTrigmode);中设置),跳出while循环。


此外,主线程对每个连接进行了初始化和设置了定时器:

1
timer(connfd, client_address);

WebServer::timer函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 为套接字connfd创建定时器
void WebServer::timer(int connfd, struct sockaddr_in client_address) {
// 初始化http连接
users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName);

// 初始化client_data数据
// 创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中
users_timer[connfd].address = client_address;
users_timer[connfd].sockfd = connfd;

// 定时器类
util_timer *timer = new util_timer;
// 客户资源
timer->user_data = &users_timer[connfd];
// 回调函数
timer->cb_func = cb_func;
time_t cur = time(NULL);
// 过期时间:15s后
timer->expire = cur + 3 * TIMESLOT;
// 设置定时器
users_timer[connfd].timer = timer;
// 将该定时器添加到链表中
utils.m_timer_lst.add_timer(timer);
}

http_conn::init函数如下:为副套接字注册读事件;主从状态机初始化,缓冲区下标初始化等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 初始化连接,外部调用初始化套接字地址
void http_conn::init(int sockfd, const sockaddr_in &addr, char *root, int TRIGMode,
int close_log, string user, string passwd, string sqlname) {
m_sockfd = sockfd;
m_address = addr;
// 为sockfd注册读事件
addfd(m_epollfd, sockfd, true, m_TRIGMode);
m_user_count++; // http连接+1

// 当浏览器出现连接重置时,可能是网站根目录出错或http响应格式出错或者访问的文件中内容完全为空
doc_root = root;
m_TRIGMode = TRIGMode;
m_close_log = close_log;

strcpy(sql_user, user.c_str());
strcpy(sql_passwd, passwd.c_str());
strcpy(sql_name, sqlname.c_str());
// 其他参数的默认初始化
init();
}

// 初始化新接受的连接
// check_state默认为分析请求行状态
void http_conn::init() {
mysql = NULL;
bytes_to_send = 0;
bytes_have_send = 0;
// 主状态机初始状态:解析请求行
m_check_state = CHECK_STATE_REQUESTLINE;
m_linger = false;
m_method = GET;
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
cgi = 0;
m_state = 0;
timer_flag = 0; // 0正常连接 1断开连接
improv = 0; // 0尚未读取/写入数据 1已经读取/写入数据
// 申请读写空间,并初始化为空字符
memset(m_read_buf, '\0', READ_BUFFER_SIZE);
memset(m_write_buf, '\0', WRITE_BUFFER_SIZE);
memset(m_real_file, '\0', FILENAME_LEN);
}
⑦ 处理异常事件

由函数WebServer::deal_timer完成,它将关闭定时器。

1
2
3
4
5
6
7
8
9
10
void WebServer::deal_timer(util_timer *timer, int sockfd) {
// 执行回调函数
// 从内核事件表删除事件,关闭文件描述符,释放连接资源
timer->cb_func(&users_timer[sockfd]);
// 移除定时器
if (timer){
utils.m_timer_lst.del_timer(timer);
}
LOG_INFO("close fd %d", users_timer[sockfd].sockfd);
}

回调函数cb_func详见6.3.3节del_timer详见6.3.4节

⑧ 处理定时器信号

事件监听函数中,对SIGALRM信号进行了处理函数的注册,并设置了定时alarm

1
2
3
4
5
// 注册信号
utils.addsig(SIGALRM, utils.sig_handler, false);

// 每隔TIMESLOT时间触发SIGALRM信号
alarm(TIMESLOT); // 5s

信号处理函数sig_handler:向u_piped[1]写入信号值。


处理定时器信号由函数WebServer::dealwithsignal函数完成,详见6.3.2节。这里的接收到的信号只能是SIGALRMSIGTERM

⑨ 副套接字可读

由函数WebServer::dealwithread完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* sockfd:可读的副套接字
*/
void WebServer::dealwithread(int sockfd) {
// 获取该套接字的定时器
util_timer *timer = users_timer[sockfd].timer;

// reactor
if (1 == m_actormodel) {
// 该连接活动,需要调整定时器位置
if (timer) {
adjust_timer(timer);
}

// 将该事件放入请求队列, 0表示读
m_pool->append(users[sockfd], 0);

// 主线程会在此处循环检测子线程是否读完数据
// 等待子线程读完之后(improv被置为1),才能退出循环
while (true) {
if (1 == users[sockfd].improv) { // 已经读取完套接字上的数据
if (1 == users[sockfd].timer_flag) { // 断开了连接
// 删除定时器,关闭套接字
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
// 重置,供下一次判断
users[sockfd].improv = 0;
// 跳出死循环
break;
}
}
}
else {
// proactor
// 主线程读套接字
if (users[sockfd].read_once()) {
LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));

// 将该事件放入请求队列
m_pool->append_p(users[sockfd]);
// 调整定时器
if (timer) {
adjust_timer(timer);
}
}
// 没有数据,断开了连接,则删除定时器,关闭套接字
else {
deal_timer(timer, sockfd);
}
}
}

reactor模型:主线程不读取已就绪的套接字,而是将连接加入到队列中(调用append),由子线程读取并处理。然后主线程循环检测子线程是否读完数据(while(true)),等待子线程读完之后(improv被置为1),才能退出循环。

proactor模型:主线程读取数据后,将该连接放入请求队列中(调用append_p),然后调整定时器(adjust_timer),如果没有数据传输,则删除定时器,关闭套接字。主线程不必等待子线程。

append_pappend函数详见4.3.3节

WebServer::adjust_timer函数如下:

1
2
3
4
5
6
7
8
9
10
// 若有数据传输,则将定时器往后延迟3个单位
// 并对新的定时器在链表上的位置进行调整
void WebServer::adjust_timer(util_timer *timer) {
time_t cur = time(NULL);
// 过期时间延后15s
timer->expire = cur + 3 * TIMESLOT;
utils.m_timer_lst.adjust_timer(timer);

LOG_INFO("%s", "adjust timer once");
}

其中又调用了工具类的adjust_timer函数。

⑩ 副套接字可写

由函数WebServer::dealwithwrite完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* sockfd:可写的套接字
*/
void WebServer::dealwithwrite(int sockfd) {
// 取出该套接字的定时器
util_timer *timer = users_timer[sockfd].timer;
// reactor
if (1 == m_actormodel) {
// 调整定时器
if (timer) {
adjust_timer(timer);
}
// 1表示写
m_pool->append(users[sockfd], 1);
// 主线程等待子线程将数据写入套接字,不管成功与否
while (true) {
if (1 == users[sockfd].improv) {
if (1 == users[sockfd].timer_flag) {
deal_timer(timer, sockfd);
users[sockfd].timer_flag = 0;
}
users[sockfd].improv = 0;
break;
}
}
}
//proactor
else {
// 有数据可写
if (users[sockfd].write()) {
LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr));
// 调整定时器,同样延后15s
if (timer) {
adjust_timer(timer);
}
}
// 否则删除定时器
else {
deal_timer(timer, sockfd);
}
}
}
⑪ 总结

这里再次分析一下reactor和proactor模型,在4.14.2节中已经分析过。

  • reactor模式中,主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话立即通知工作线程(逻辑单元 ),读写数据、接受新连接及处理客户请求均在工作线程中完成。通常由同步I/O实现。
  • proactor模式中,主线程和内核负责处理读写数据、接受新连接等I/O操作,工作线程仅负责业务逻辑,如处理客户请求。通常由异步I/O实现。

区别在于处理副套接字可读和可写时的分支上,函数为dealwithreaddealwithwrite

7.2.2 线程池中的线程

根据模型的不同,子线程完成的任务不同。

  • reactor:需要先读取/写入数据,然后处理
  • proactor:只需要处理

线程执行的函数为threadpool<T>::worker,其中由调用了threadpool<T>::run,详见4.3.44.3.5节。