Redis源码学习笔记

学习来源:redis-2.8

1 源码结构

redis-2.8为例:

image-20230314100126607

来源:古明地盆

1.1 目录

1.1.1 deps

这个目录主要包含了 Redis 依赖的第三方代码库,包括 Redis 的 C 语言版本客户端代码 hiredis、jemalloc 内存分配器代码、readline 功能的替代代码 linenoise,以及 lua 脚本代码。这部分代码的一个显著特点就是它们可以独立于 Redis src 目录下的功能源码进行编译,也就是说它们可以独立于 Redis 存在和发展。

img

1.1.2 src

这个目录里面包含了 Redis 所有功能模块的代码文件。

因为 Redis 的功能模块实现是典型的 C 语言风格,不同功能模块之间不再设置目录分隔,而是通过头文件包含来相互调用。这样的代码风格在基于 C 语言开发的系统软件中也比较常见,比如 Memcached 的源码文件也是在同一级目录下。

img

1.1.3 tests

Redis 实现的测试代码可以分成四部分,分别是单元测试(对应 unit 子目录),Redis Cluster 功能测试(对应 cluster 子目录)、哨兵功能测试(对应 sentinel 子目录)、主从复制功能测试(对应 integration 子目录)。这些子目录中的测试代码使用了 Tcl 语言(通用的脚本语言)进行编写,主要目的就是方便进行测试。

img

1.1.4 utils

在 Redis 开发过程中,还有一些功能属于辅助性功能,包括用于创建 Redis Cluster 的脚本、用于测试 LRU 算法效果的程序,以及可视化 rehash 过程的程序。在 Redis 代码结构中,这些功能代码都被归类到了 utils 目录中统一管理。

img

除了 deps、src、tests、utils 四个子目录以外,Redis 源码总目录下其实还包含了两个重要的配置文件,一个是 Redis 实例的配置文件 redis.conf,另一个是哨兵的配置文件 sentinel.conf。当你需要查找或修改 Redis 实例或哨兵的配置时,就可以直接定位到源码主目录下。

1.2 功能模块

Redis 代码结构中的 src 目录,包含了实现功能模块的 123 个代码文件,Redis 服务端的所有功能实现都在这里面。在这 123 个代码文件中,对于某个功能来说,一般包括了实现该功能的源文件(.c 文件) 和对应的头文件(.h 文件),比如 dict.c 和 dict.h 就是用于实现哈希表的 C 文件和头文件。

1.2.1 服务器实例

img

1.2.2 数据类型和结构

img

1.2.3 高可靠和高可扩展

数据持久化

Redis 的数据持久化实现有两种方式:内存快照 RDB 和 AOF 日志,分别实现在了 rdb.h/rdb.c 和 aof.c 中。

注意,在使用 RDB 或 AOF 对数据库进行恢复时,RDB 和 AOF 文件可能会因为 Redis 实例所在服务器宕机,而未能完整保存,进而会影响到数据库恢复。因此针对这一问题,Redis 还实现了对这两类文件的检查功能,对应的代码文件分别是 redis-check-rdb.c 和 redis-check-aof.c。

主从复制实现

Redis 把主从复制功能实现在了 replication.c 文件中,另外你还需要知道的是,Redis 的主从集群在进行恢复时,主要是依赖于哨兵机制,而这部分功能则直接实现在了 sentinel.c 文件中。其次,与 Redis 实现高可靠性保证的功能类似,Redis 高可扩展性保证的功能,是通过 Redis Cluster 来实现的,这部分代码也非常集中,就是在 cluster.h/cluster.c 代码文件中。

2 数据结构

2.1 简单动态字符串

源码文件:sds.csds.h

这里只分析SDS的定义,创建和释放,其他操作都是对string.h的进一步封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// sds.h
#ifndef __SDS_H
#define __SDS_H

#include <sys/types.h>
#include <stdarg.h>

typedef char *sds; // 为char*取一个别名sds

struct sdshdr {
// ----结构体定长部分----
unsigned int len; // 记录buf数组中已经使用的有效字节数(不包含空字符),等于SDS所保存的字符串长度
unsigned int free; // 记录buf数组中未使用字节的数量
// ----结构体变长部分----
char buf[]; // 字节数组,保存字符串(有空字符),长度为buf=len+free+1,注意,这里的buf没有指定大小,是一个柔性数组,即变长结构体
};

// 内联函数
// 求字符串的有效长度
static inline size_t sdslen(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->len;
}

// 内联函数
// 求字符串的未分配长度
static inline size_t sdsavail(const sds s) {
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
return sh->free;
}

// function protos...
#endif

内联函数sdslensdsavail都是对sds进行操作(注意不是普通的C字符串),换句话说,sds存在于已经创建好的结构体sdshdr中。

sdslen为例,注意到:

1
struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));

sds(本质是char *的别名)指向buf数组的首地址,因此,用sds减去sdshdr结构体的定长部分(len和free的空间大小之和)得到的地址就是sdshdr结构体的首地址,再强转为void *,因此可以赋值给struct sdshdr *的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// sds.c

/**
* 创建一个空的sds
* len=0, free=0, buf=['\0']
*/
sds sdsempty(void) {
return sdsnewlen("",0);
}

/**
* 源码注释:从以空结尾的C字符串开始创建新的sds字符串
* 根据init创建一个sds,如果init=NULL,则调用效果同sdsnewlen("",0);
* 如果init不为空,以init="abc"为例:
* len=3,free=0,buf=['a','b','c','\0']
*/
sds sdsnew(const char *init) {
size_t initlen = (init == NULL) ? 0 : strlen(init);
return sdsnewlen(init, initlen);
}

// ------------------------------------------------------

/**
* 复制一个sds
*/
sds sdsdup(const sds s) {
return sdsnewlen(s, sdslen(s));
}

/**
* 释放sds
*/
void sdsfree(sds s) {
if (s == NULL) return;
zfree(s-sizeof(struct sdshdr));
}

注意到,上述函数均调用了sds.c/sdsnewlen函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 根据给定的一个起始地址init和长度initlen,创建一个sdshdr结构体,返回结构体中的buf成员
* sdshdr->len: initlen(有效字符)
* sdshdr->free: 0
* sdshdr->buf: init的前initlen个字节
* 返回sds(char *的别名),即sdshdr->buf数组的首地址
*/
sds sdsnewlen(const void *init, size_t initlen) {
struct sdshdr *sh;

if (init) { // 如果init不为NULL
sh = zmalloc(sizeof(struct sdshdr)+initlen+1); // 1是保存\0的1字节空间
} else { // init为NULL
sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
}
if (sh == NULL) return NULL; // 创建失败
sh->len = initlen; // 有效长度
sh->free = 0; // 未使用长度
if (initlen && init)
// 复制init的前initlen个字节到sh->buf数组中
memcpy(sh->buf, init, initlen);
// 结尾设置为空字符
sh->buf[initlen] = '\0';
return (char*)sh->buf;
}

其中,zmalloczcalloc是对malloccalloc的封装,现在来分析:

1
sh = zmalloc(sizeof(struct sdshdr)+initlen+1);

注意sdshdr为变长结构体,sizeof(struct sdshdr)为成员lenfree分配了空间,initlen+1则为buf[]分配了空间,其中initlen为有效长度,1为空字符的空间。

1
memcpy(sh->buf, init, initlen);

将地址init开始的initlen字节复制到sh->buf

memcpy函数原型如下:

1
void *memcpy(void *dest, const void *src, size_t num );

memcpy 会复制 src 所指的内存内容的前 num 个字节到 dest 所指的内存地址上。memcpy 并不关心被复制的数据类型(因此都是void *,只要是地址就行),只是逐字节地进行复制,这给函数的使用带来了很大的灵活性,可以面向任何数据类型进行复制。


long long创建sds

1
2
3
4
5
6
sds sdsfromlonglong(long long value) {
char buf[SDS_LLSTR_SIZE]; // #define SDS_LLSTR_SIZE 21
int len = sdsll2str(buf,value);

return sdsnewlen(buf,len);
}

其中又调用了sdsll2str,该函数才是真正将long long转换成字符串:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// s的长度至少要有SDS_LLSTR_SIZE个字节
#define SDS_LLSTR_SIZE 21
/**
* 返回以\0结尾的s字符串
*/
int sdsll2str(char *s, long long value) {
char *p, aux;
unsigned long long v;
size_t l;

// 将value转成整数
v = (value < 0) ? -value : value;
p = s; // s是首地址,p的地址用于后续的处理
do {
*p++ = '0'+(v%10); // 取得v的末尾数并添加到p所在的位置,然后p后移1个位置
v /= 10;
} while(v);
if (value < 0) *p++ = '-'; // 如果原值为负数,则在末尾添加一个减号

// 计算有效长度,并在末尾添加\0
l = p-s;
*p = '\0';

// 从有效长度的末尾开始反转字符串
p--;
while(s < p) {
aux = *s;
*s = *p;
*p = aux;
s++; // s向右移动
p--; // p向左移动
}
// 返回有效长度
return l;
}

注意语句:

1
*p++ = '0'+(v%10);

右侧‘0’+(v%10)的作用是将v的末尾数字转换成ASCII码表示的单字符数字。

左侧*p++*的优先级大于++,因此执行顺序为:

1
2
*p = '0'+(v%10);
p++;

2.2 链表

源码文件:adlist.cadlist.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// adlist.h 链表头文件

#ifndef __ADLIST_H__
#define __ADLIST_H__

// 链表节点的结构体
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value; // redis字符串对象
} listNode;

// 链表的迭代器
typedef struct listIter {
listNode *next;
int direction;
} listIter;

// 链表的结构体
typedef struct list {
listNode *head; // 头结点
listNode *tail; // 尾结点
void *(*dup)(void *ptr); // 节点值复制函数指针
void (*free)(void *ptr); // 节点值释放函数指针
int (*match)(void *ptr, void *key); // 节点值对比函数指针
unsigned long len; // 链表长度
} list;

/* 宏函数 */
// 一些常见的链表操作
#define listLength(l) ((l)->len)
#define listFirst(l) ((l)->head)
#define listLast(l) ((l)->tail)
#define listPrevNode(n) ((n)->prev)
#define listNextNode(n) ((n)->next)
#define listNodeValue(n) ((n)->value)

// 设置函数指针
#define listSetDupMethod(l,m) ((l)->dup = (m))
#define listSetFreeMethod(l,m) ((l)->free = (m))
#define listSetMatchMethod(l,m) ((l)->match = (m))

// 获取函数指针
#define listGetDupMethod(l) ((l)->dup)
#define listGetFree(l) ((l)->free)
#define listGetMatchMethod(l) ((l)->match)

/* Prototypes */
// ...

/* 迭代器的方向 */
#define AL_START_HEAD 0
#define AL_START_TAIL 1

#endif /* __ADLIST_H__ */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#include <stdlib.h>
#include "adlist.h"
#include "zmalloc.h"

/**
* 创建链表
*/
list *listCreate(void) {
struct list *list;

if ((list = zmalloc(sizeof(*list))) == NULL)
return NULL;
list->head = list->tail = NULL;
list->len = 0;
list->dup = NULL;
list->free = NULL;
list->match = NULL;
return list;
}

/**
* 销毁整个链表,逐个结点进行释放
*/
void listRelease(list *list) {
unsigned long len;
listNode *current, *next;

current = list->head;
len = list->len;
while(len--) {
next = current->next;
if (list->free) list->free(current->value);
zfree(current);
current = next;
}
zfree(list);
}

/**
* 头插
* 头结点的值由value指定,类型不限(void *)
*/
list *listAddNodeHead(list *list, void *value) {
listNode *node;
// 为结点分配空间
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
if (list->len == 0) { // 链表为空
list->head = list->tail = node;
node->prev = node->next = NULL;
} else { // 链表不空
node->prev = NULL;
node->next = list->head;
list->head->prev = node;
list->head = node;
}
list->len++;
return list;
}

/**
* 尾插
*/
list *listAddNodeTail(list *list, void *value) {
listNode *node;

if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
} else {
node->prev = list->tail;
node->next = NULL;
list->tail->next = node;
list->tail = node;
}
list->len++;
return list;
}

/**
* 在结点old_node之前或之后插入一个结点
* after==0: 在其之前
* after!=0: 在其之后
*/
list *listInsertNode(list *list, listNode *old_node, void *value, int after) {
listNode *node;

if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
if (after) {
node->prev = old_node;
node->next = old_node->next;
if (list->tail == old_node) {
list->tail = node;
}
} else {
node->next = old_node;
node->prev = old_node->prev;
if (list->head == old_node) {
list->head = node;
}
}
if (node->prev != NULL) {
node->prev->next = node;
}
if (node->next != NULL) {
node->next->prev = node;
}
list->len++;
return list;
}

/**
* 删除节点node
*/
void listDelNode(list *list, listNode *node) {
if (node->prev)
node->prev->next = node->next;
else
list->head = node->next;
if (node->next)
node->next->prev = node->prev;
else
list->tail = node->prev;
if (list->free) list->free(node->value);
zfree(node);
list->len--;
}

/**
* 获取链表的迭代器,direction为方向
*/
listIter *listGetIterator(list *list, int direction) {
listIter *iter;

if ((iter = zmalloc(sizeof(*iter))) == NULL) return NULL;
if (direction == AL_START_HEAD) // 正向
iter->next = list->head;
else
iter->next = list->tail; // 反向
iter->direction = direction;
return iter;
}

/* 销毁迭代器 */
void listReleaseIterator(listIter *iter) {
zfree(iter);
}

/**
* 利用迭代器获取下一个结点
*/
listNode *listNext(listIter *iter) {
listNode *current = iter->next;

if (current != NULL) {
if (iter->direction == AL_START_HEAD)
iter->next = current->next;
else
iter->next = current->prev;
}
return current;
}

2.3 字典

重要的结构体:

dictEntry:键值对,或称为哈希节点

dictht:哈希表,包含键值对dictEntry

dict:字典,包含两个哈希表dictht

2.3.1 头文件

源码文件在dict.hdict.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdint.h>

#ifndef __DICT_H
#define __DICT_H

#define DICT_OK 0
#define DICT_ERR 1

/* Unused arguments generate annoying warnings... */
#define DICT_NOTUSED(V) ((void) V)

// 哈希结点,即键值对
typedef struct dictEntry {
void *key; // 键,redis字符串对象,
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; // 值,redis对象
struct dictEntry *next; // 下一个哈希结点
} dictEntry;

// 对字典的操作
typedef struct dictType {
// 计算哈希值的函数
unsigned int (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比键的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);
} dictType;

// 哈希表的结构体
typedef struct dictht {
dictEntry **table; // 哈希表数组
unsigned long size; // 哈希表大小
unsigned long sizemask; // 哈希表大小掩码,用于计算索引值,是等于size-1,作用:让索引始终落在0~size-1之间
unsigned long used; // 该哈希表已有节点的数量(已有的键值对数量)
} dictht;

// 字典的结构体
typedef struct dict {
dictType *type; // 对字典的操作,不同的特定函数
void *privdata; // 保存了需要传给那些类型特定函数的可选参数
dictht ht[2]; // 两个哈希表
long rehashidx; // rehash参数
int iterators; // 迭代器
} dict;

/* 哈希表数组的初始化大小 */
#define DICT_HT_INITIAL_SIZE 4

// 宏函数
// ...

// 函数原型
// ...

/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;
extern dictType dictTypeHeapStrings;
extern dictType dictTypeHeapStringCopyKeyValue;

#endif /* __DICT_H */

2.3.2 实现

  • 创建字典
1
2
3
4
5
6
7
8
9
10
11
12
/* Create a new hash table */
/**
* type: 对字典的操作,一系列特定函数指针的结构体
* privDataPtr: 特定函数的参数
*/
dict *dictCreate(dictType *type, void *privDataPtr) {
// 创建一个空的dict字典d
dict *d = zmalloc(sizeof(*d));

_dictInit(d,type,privDataPtr);
return d;
}

又调用了_dictInit初始化字典:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Initialize the hash table */
/**
* 初始化字典
*/
int _dictInit(dict *d, dictType *type, void *privDataPtr) {
_dictReset(&d->ht[0]); // 重置哈希表,但不释放空间
_dictReset(&d->ht[1]);
d->type = type;
d->privdata = privDataPtr;
d->rehashidx = -1;
d->iterators = 0;
return DICT_OK;
}

其中又调用了_dictReset对哈希表进行重置:

1
2
3
4
5
6
7
8
/* Reset a hash table already initialized with ht_init().
* NOTE: This function should only be called by ht_destroy(). */
static void _dictReset(dictht *ht) {
ht->table = NULL; // 哈希表数组
ht->size = 0; // 哈希表大小
ht->sizemask = 0; // 掩码
ht->used = 0; // 已使用空间
}
  • 渐进rehash函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 渐进rehash
* n:rehash执行次数
*/
int dictRehash(dict *d, int n) {
// 没有在rehash,返回0,表示rehash完毕
if (!dictIsRehashing(d)) return 0;
// 进行n次rehash
while(n--) {
dictEntry *de, *nextde;

// 如果表0空了,表示已经rehash完毕
// 将表0释放,表1赋给表0,并将表1重置
if (d->ht[0].used == 0) {
zfree(d->ht[0].table); // 释放表0的空间
d->ht[0] = d->ht[1]; // 表1赋给表0
_dictReset(&d->ht[1]); // 重置表1
d->rehashidx = -1; // 没有在rehash
return 0; // 返回0,表示rehash完毕
}

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
// 保证0表的大小大于rehashidx
assert(d->ht[0].size > (unsigned long)d->rehashidx);
// 从rehashidx开始,找到不为空的哈希起始节点
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
// 起始哈希节点("链表"的头结点)
de = d->ht[0].table[d->rehashidx];
// 将此位置(rehashidx)的所有哈希节点rehash到表1
while(de) {
uint64_t h;
// 记录下一个哈希节点
nextde = de->next;
// 计算key在表1的索引值
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
// 将节点de头插入表1的h位置
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
// 更改计数
d->ht[0].used--;
d->ht[1].used++;
// 移动指针
de = nextde;
}
// 置空
d->ht[0].table[d->rehashidx] = NULL;
// rehash索引++,意味着下一次rehash从表0的rehashidx位置开始
d->rehashidx++;
}
// 返回1表示表0还有数据需要进行rehash
return 1;
}
  • 计算key对应的索引值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static long _dictKeyIndex(dict *d, const void *key) {
unsigned long h, idx, table;
dictEntry *he;

// 如果需要,扩容
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
// 计算key的哈希值
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
// 索引 = 哈希值 & 掩码
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx]; // he是指向dictEntry的指针
// 采用链地址法来避免哈希冲突
while(he) {
// 比较两个键是否相同
// 键相同,直接返回,表示键已经存在
if (dictCompareKeys(d, key, he->key))
return -1;
// 键不同,值相同,则指向下一个哈希节点
he = he->next;
}
// 在rehash时,dictIsRehashing返回true,不进行break,对两个哈希表都寻找索引,因为可能在1表中查找到可用索引
// 没有rehash时,只在表0中寻找可用索引
if (!dictIsRehashing(d)) break;
}
// 返回可用的索引
return idx;
}

其中用到了几个宏函数:

1
2
3
4
5
6
// 比较两个key是否相同
// 调用了dictType(对字典的操作的结构体)中的keyCompare函数
#define dictCompareKeys(d, key1, key2) \
(((d)->type->keyCompare) ? \
(d)->type->keyCompare((d)->privdata, key1, key2) : \
(key1) == (key2))
1
2
3
// 判断是否在rehash
// 没有rehash时(rehashidx == -1),返回false
#define dictIsRehashing(d) ((d)->rehashidx != -1)

扩容函数_dictExpandIfNeeded见后。

  • 在字典中的一个哈希表中添加键值对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Add an element to the target hash table */
/**
* 在字典中的一个哈希表中添加键值对
* d: 字典
* key: 键
* val: 值
*/
int dictAdd(dict *d, void *key, void *val) {
dictEntry *entry = dictAddRaw(d,key);

if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}

其中调用了dictAddRaw函数,是真正添加键值对的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 给字典的一个哈希表添加键值对
*/
dictEntry *dictAddRaw(dict *d, void *key) {
long index; // 可用索引
dictEntry *entry;
dictht *ht;
// 如果在rehash,进行一次rehash
if (dictIsRehashing(d)) _dictRehashStep(d);

// 获取可用索引,-1表示键已经存在,无法在哈希表中加入当前键值对
if ((index = _dictKeyIndex(d, key)) == -1)
return NULL;

// 没有rehash,则用第0个哈希表,否则用第1个哈希表
// 这一措施保证了ht[0]包含的键值对数量会只减不增,并随着rehash操作的执行而最终变成空表
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
// 为一个哈希节点dictEntry分配内存
entry = zmalloc(sizeof(*entry));
// 头插入哈希节点
entry->next = ht->table[index];
ht->table[index] = entry;
// 节点数+1
ht->used++;

dictSetKey(d, entry, key);
return entry;
}

dictAddRaw函数中判断了字典是否在rehash,如果在rehash,则进行一次rehash,然后试着在表1中添加键值对,如果没有在rehash,则跳过此步骤,在表0中添加键值对。这一步的作用是,如果字典在rehash中,那么每次进行常用的查找、更新操作时,能够让哈希表自动从表0迁移到表1,加速rehash过程。

_dictRehashStep函数如下:

1
2
3
4
5
6
7
/**
* 进行一次rehash
*/
static void _dictRehashStep(dict *d) {
// n = 1,进行一次rehash
if (d->iterators == 0) dictRehash(d,1);
}
  • 通过键来查找键值对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 通过键查找键值对
* 返回dictEntry指针
*/
dictEntry *dictFind(dict *d, const void *key) {
dictEntry *he;
uint64_t h, idx, table;

if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
// 如果在rehash,就rehash一次
if (dictIsRehashing(d)) _dictRehashStep(d);
// 计算key的哈希值
h = dictHashKey(d, key);
// 在两个表中找
for (table = 0; table <= 1; table++) {
// 计算索引
idx = h & d->ht[table].sizemask;
// 头结点
he = d->ht[table].table[idx];
while(he) {
// 如果键相同,则找到,返回该哈希节点
if (dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
// 如果没有在rehash,则只有表0有数据,没有必要在表1中找,返回空
if (!dictIsRehashing(d)) return NULL;
}
// 都没有找到,返回空
return NULL;
}
  • 通过键来获取值
1
2
3
4
5
6
void *dictFetchValue(dict *d, const void *key) {
dictEntry *he;

he = dictFind(d,key);
return he ? dictGetVal(he) : NULL;
}

dictGetVal为宏函数:

1
#define dictGetVal(he) ((he)->v.val)
  • 删除键值对

注意,需要释放哈希节点(dictEntry)的内存,同时,键和值如果是引用类型的指针的话(例如字符串),还需要释放这一片内存,此时需要调用在dictType注册好的键和值销毁函数。

1
2
3
4
5
6
7
8
9
// 需要释放kv
int dictDelete(dict *ht, const void *key) {
return dictGenericDelete(ht,key,0);
}

// 不需要释放kv
int dictDeleteNoFree(dict *ht, const void *key) {
return dictGenericDelete(ht,key,1);
}

两者都调用了dictGenericDelete函数,是真正执行释放内存的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 删除一个键值对
* nofree: 0释放k,v的空间 1不释放
*/
static int dictGenericDelete(dict *d, const void *key, int nofree) {
uint64_t h, idx;
dictEntry *he, *prevHe;
int table;
if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */
// 在rehash就rehash一次
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
// 在两个表中找
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL;
while(he) {
// 找到key相同的键值对
if (dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
dictFreeKey(d, he);
dictFreeVal(d, he);
}
// 释放哈希节点的空间
zfree(he);
// 更新计数
d->ht[table].used--;
return DICT_OK;
}
// 继续找
prevHe = he;
he = he->next;
}
// 没有在rehash,就只在表0中找
if (!dictIsRehashing(d)) break;
}
// 没有找到
return DICT_ERR; /* not found */
}

dictFreeKeydictFreeVal为宏函数:

1
2
3
4
5
6
7
#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) \
(d)->type->keyDestructor((d)->privdata, (entry)->key)

#define dictFreeVal(d, entry) \
if ((d)->type->valDestructor) \
(d)->type->valDestructor((d)->privdata, (entry)->v.val)
  • 收缩和扩展

为了让哈希表的负载因子(load factor)维持在一个合理的范围之内,当哈希表保存的键值对数量太多或者太少时,程序需要对哈希表的大小进行相应的扩展或者收缩。收缩和扩展可以通过rehash来完成。

负载因子:过高的负载因子会导致hash冲突的增多,而如果把负载因子设置的过低,则会导致哈希表的频繁扩容,损耗性能。

1
load_factor = ht[0].used / ht[0].size;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 哈希表扩容
* size: 指定的扩容大小
*/
int dictExpand(dict *d, unsigned long size) {
dictht n; /* the new hash table */
// 计算扩容后的大小: 是大于等于size的最小2次幂
unsigned long realsize = _dictNextPower(size);

// 正在rehash 或者 已有键值对数量大于扩容后大小 则报错返回
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;

// 初始化
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

// 表0为空,则是第一次初始化,直接将表0的指针指向新表n
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}

// 将表1的指针指向新的表n
d->ht[1] = n;
d->rehashidx = 0; // 设置rehash索引
return DICT_OK;
}

其中调用了_dictNextPower函数来确定扩容的大小,限制为$2^n$:

1
2
3
4
5
6
7
8
9
10
static unsigned long _dictNextPower(unsigned long size) {
unsigned long i = DICT_HT_INITIAL_SIZE; // 表的最小大小为4
// 表的最大值为LONG_MAX
if (size >= LONG_MAX) return LONG_MAX;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}

dictResize函数将表的大小调整为包含所有元素的最小大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 /**
* 将表的大小调整为包含所有元素的最小大小
*/
int dictResize(dict *d) {
int minimal;
// 收缩必须满足两个前提:
// 1.打开了dict_can_resize开关
// 2.没有在rehash
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
// 限制最小大小为DICT_HT_INITIAL_SIZE
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
// 将哈希表大小改变为minimal
return dictExpand(d, minimal);
}

dict_can_resize定义:

1
static int dict_can_resize = 1; // 默认可以resize

可以通过下列函数打开和关闭:

1
2
3
4
5
6
7
void dictEnableResize(void) { // 打开
dict_can_resize = 1;
}

void dictDisableResize(void) { // 关闭
dict_can_resize = 0;
}

在函数_dictKeyIndex(计算键的索引)中,判断了是否需要进行扩容:

1
2
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;

该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 判断是否需要进行扩容,如果需要,就进行扩容
* 扩容至4或者used*2
*/
static int _dictExpandIfNeeded(dict *d) {
// 正在rehash(正在扩容),则返回
if (dictIsRehashing(d)) return DICT_OK;

// 表0为空,则按最小大小4进行扩容
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
// 进行扩容的条件,下列两个条件均需要满足
// 1.已使用空间 == 有效空间
// 2.扩容开关打开 || 负载因子大于阈值
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
// 扩容至原来已使用大小的两倍
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}

dict_force_resize_ratio为负载因子的阈值:

1
static unsigned int dict_force_resize_ratio = 5;
  • 迭代器相关的函数,略
  • 销毁哈希表和字典,略
  • 哈希函数

redis提供了两种哈希函数来计算键的哈希值,分别是Thomas Wang's 32 bit Mix FunctionMurmurHash2

2.4 压缩列表

2.4.1 定义

源码所在文件:ziplist.h

3 对象

源码所在文件:redis.h(定义),object.c(操作)

3.1 定义

redisObject类型结构体:

1
2
3
4
5
6
7
8
#define REDIS_LRU_BITS 24
typedef struct redisObject {
unsigned type:4; // 类型,对应五大数据类型
unsigned encoding:4; // 编码,底层实现采用哪一种数据结构
unsigned lru:REDIS_LRU_BITS; // 记录对象最后一次被程序访问的时间
int refcount; // 引用计数
void *ptr; // 指向底层实现数据结构的指针
} robj;
  • unsigned若省略后一个关键字,大多数编译器都会认为是unsigned int
  • :4表示强制让unsigned int(一般占用8位)只占用4位。
  • type取值:
1
2
3
4
5
6
/* Object types */
#define REDIS_STRING 0 // 字符串
#define REDIS_LIST 1 // 列表
#define REDIS_SET 2 // 集合
#define REDIS_ZSET 3 // zset
#define REDIS_HASH 4 // 哈希表
  • encoding取值:
1
2
3
4
5
6
7
8
#define REDIS_ENCODING_RAW 0     /* Raw representation */
#define REDIS_ENCODING_INT 1 /* Encoded as integer */
#define REDIS_ENCODING_HT 2 /* Encoded as hash table */
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
  • refcount:引用计数,用于内存回收

3.2 操作

创建对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 创建对象
* type: 类型
* ptr: 指向底层数据结构的指针,例如传入sds(char *的别名)
* return: redis对象
*/
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
// 默认为RAW编码
o->encoding = REDIS_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;

/* Set the LRU to the current lruclock (minutes resolution). */
o->lru = server.lruclock;
return o;
}

3.2.1 字符串对象

  • 从一个C风格字符串来创建string对象:
1
2
3
4
5
6
7
8
9
/**
* 从一个C风格字符串来创建string对象
* ptr: 指向字符串的指针
* len: 有效长度
*/
robj *createStringObject(char *ptr, size_t len) {
// 类型为REDIS_STRING
return createObject(REDIS_STRING,sdsnewlen(ptr,len));
}
  • long long类型(一般占用8个字节)来创建string对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 从long long来创建string对象
*/
robj *createStringObjectFromLongLong(long long value) {
robj *o;
if (value >= 0 && value < REDIS_SHARED_INTEGERS) { // 0 <= value <= 10000
// 共享对象的引用计数+1
incrRefCount(shared.integers[value]);
// 将共享对象赋值给o即可
o = shared.integers[value];
} else { // 大于这个范围则不共享
// 在long的范围内
if (value >= LONG_MIN && value <= LONG_MAX) {
o = createObject(REDIS_STRING, NULL);
o->encoding = REDIS_ENCODING_INT; // 设置编码为INT
o->ptr = (void*)((long)value); // 设置底层数据结构为long的指针
} else { // 在long与long long的范围之间
// 设置编码为raw
o = createObject(REDIS_STRING,sdsfromlonglong(value));
}
}
return o;
}

其中变量redis.c/share是全局变量,用于记录共享对象,定义为:

1
struct sharedObjectsStruct shared;

结构体redis.h/sharedObjectsStruct定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
*lpush, *emptyscan, *minstring, *maxstring,
*select[REDIS_SHARED_SELECT_CMDS], // robj对象数组
*integers[REDIS_SHARED_INTEGERS],
*mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
*bulkhdr[REDIS_SHARED_BULKHDR_LEN]; /* "$<value>\r\n" */
};

以成员robj *integers[REDIS_SHARED_INTEGERS]为例,integers是一个大小为REDIS_SHARED_INTEGERS的数组,数组元素为robj *

object.c/incrRefCount函数用于增加对象的引用计数:

1
2
3
void incrRefCount(robj *o) {
o->refcount++;
}

相应的还有减少对象引用计数的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void decrRefCount(robj *o) {
// 报错
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
// 当引用计数为1时,需要释放对象的内存
if (o->refcount == 1) {
// 根据编码方式释放redisObject中为成员分配的堆动态内存
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
case REDIS_SET: freeSetObject(o); break;
case REDIS_ZSET: freeZsetObject(o); break;
case REDIS_HASH: freeHashObject(o); break;
default: redisPanic("Unknown object type"); break;
}
// 释放redisObject本身的内存
zfree(o);
} else { // 引用计数大于1
o->refcount--;
}
}

// 此外还提供了不限类型的参数的函数,作用可见3.2.2节
void decrRefCountVoid(void *o) {
decrRefCount(o);
}

// This function set the ref count to zero without freeing the object.
robj *resetRefCount(robj *obj) {
obj->refcount = 0;
return obj;
}
  • 从一个long double类型的数转换成字符串对象

  • 复制字符串对象

1
2
3
4
5
6
robj *dupStringObject(robj *o) {
// 用于调试
redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
// 调用createStringObject创建
return createStringObject(o->ptr,sdslen(o->ptr));
}
  • 释放字符串对象:
1
2
3
4
5
6
void freeStringObject(robj *o) {
// 编码方式为raw,则直接释放
if (o->encoding == REDIS_ENCODING_RAW) {
sdsfree(o->ptr);
}
}

3.2.2 列表对象

  • 创建列表对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 创建一个列表对象
*/
robj *createListObject(void) {
// 创建一个空链表
list *l = listCreate();
// 创建REDIS_LIST列表对象
robj *o = createObject(REDIS_LIST,l);
// 设置列表对象的节点释放函数
listSetFreeMethod(l,decrRefCountVoid);
// 设置编码为REDIS_ENCODING_LINKEDLIST 即双端链表
o->encoding = REDIS_ENCODING_LINKEDLIST;
return o;
}

其中adlist.h/listSetFreeMethod是一个宏函数:

1
#define listSetFreeMethod(l,m) ((l)->free = (m))

作用是将l的结构体成员free函数指针指向为m函数。

free成员是一个函数指针,返回值为void,参数为void*的指针:

1
void (*free)(void *ptr);

于是,这里是将lfree函数指针指向void decrRefCountVoid(void *o)函数。decrRefCountVoid用于减少引用计数,实际上它又调用了decrRefCount,详见3.2.1节。

  • 释放列表对象内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 释放列表对象内存
*/
void freeListObject(robj *o) {
switch (o->encoding) {
case REDIS_ENCODING_LINKEDLIST: // 双端链表编码
listRelease((list*) o->ptr); // 销毁双端链表
break;
case REDIS_ENCODING_ZIPLIST: // 压缩列表编码
zfree(o->ptr);
break;
default:
redisPanic("Unknown list encoding type");
}
}

adlist.c/listRelease函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 销毁整个链表,逐个结点进行释放
*/
void listRelease(list *list) {
unsigned long len;
listNode *current, *next;

current = list->head;
len = list->len;
while(len--) {
next = current->next;
// 如果free有被注册,则执行
// 对于列表对象,调用在创建列表对象时注册的free函数,即decrRefCountVoid函数
if (list->free) list->free(current->value);
// 释放ListNode节点内存
zfree(current);
current = next;
}
zfree(list);
}

这里除了需要释放节点本身(ListNode)占用的内存,还需要调用list->free原因在于ListNode的值void *value可能会指向redis对象(如果是redis对象,则只能是字符串对象),这一片内存也需要释放,如图所示。

image-20230327171429237

当调用:

1
2
3
if (list->free) list->free(current->value);
// 实质上调用
decrRefCount(current->value); // current->value = robj; value指向的是一个redis的字符串对象,也只能是字符串对象嵌套在其他四种类型的对象中

进入decrRefCount函数内,如果引用计数为1,则释放:

1
case REDIS_STRING: freeStringObject(o); break;

如果字符串对象是raw编码,则进入freeStringObject后会执行:

1
sdsfree(o->ptr);

例如图中hello的那片区域。最后decrRefCount会释放该redis对象本身的内存,例如第一个编码为REDIS_STRING的对象。

如果引用大于1,说明还有其他对象引用它,则只减去一个引用计数。

3.2.3 哈希对象

  • 创建哈希对象
1
2
3
4
5
6
7
8
9
robj *createHashObject(void) {
// 空的压缩列表
unsigned char *zl = ziplistNew();
// type: REDIS_HASH
robj *o = createObject(REDIS_HASH, zl);
// encoding: REDIS_ENCODING_ZIPLIST(压缩列表)
o->encoding = REDIS_ENCODING_ZIPLIST;
return o;
}
  • 释放哈希对象
1
2
3
4
5
6
7
8
9
10
11
12
13
void freeHashObject(robj *o) {
switch (o->encoding) {
case REDIS_ENCODING_HT:
dictRelease((dict*) o->ptr);
break;
case REDIS_ENCODING_ZIPLIST:
zfree(o->ptr);
break;
default:
redisPanic("Unknown hash encoding type");
break;
}
}

4 数据库

源码所在文件:redis.h(数据库定义)

4.1 定义

数据库结构体定义:

1
2
3
4
5
6
7
8
9
typedef struct redisDb {
dict *dict; // 键空间
dict *expires; // 键的过期时间
dict *blocking_keys; // 处于阻塞状态的键和相应的client(主要用于List类型的阻塞操作)
dict *ready_keys; // 准备好数据可以解除阻塞状态的键和相应的client
dict *watched_keys; // 被watch命令监控的key和相应client
int id; // 数据库ID标识
long long avg_ttl; // 数据库内所有键的平均TTL(生存时间)
} redisDb;

它作为redisServer结构体中的成员存在:

1
2
3
4
5
6
7
struct redisServer {
// ...
redisDb *db; // redisDb数组的首地址
// ...
int dnum; // 数据库的数量
// ...
};

4.2 操作

源码所在文件:db.c(相关操作)

4.2.1

5 RDB持久化

6 AOF持久化

7 事件/客户端/服务器

7.0 程序入口和服务器初始化

  • 连接请求时序图

img

  • 命令请求时序图

img

  • 命令回复时序图

img


redis服务器程序入口函数位于redis.c/main

1
2
3
4
5
6
7
8
9
10
11
12
13
int main(int argc, char **argv) {
// ...

// 初始化服务器
initServer();
// ...

// 开启监听和事件循环
aeMain(server.el);

// ...
return 0;
}

7.1 事件

源码位置:ae.hae.c

7.1.1 ae.h

核心结构体:aeEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#ifndef __AE_H__
#define __AE_H__

#define AE_OK 0
#define AE_ERR -1

#define AE_NONE 0 // 套接字没有任何事件被监听
#define AE_READABLE 1 // 套接字的读事件被监听
#define AE_WRITABLE 2 // 套接字的写事件被监听

#define AE_FILE_EVENTS 1 // 文件事件
#define AE_TIME_EVENTS 2 // 事件事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) // 所有事件
#define AE_DONT_WAIT 4

#define AE_NOMORE -1 // 定时事件

/* Macros */
// 为避免编译器警告,可以使用该宏,表示该变量已经被使用过了
// 一些声明或定义的变量没有使用时,编译器会报警告
#define AE_NOTUSED(V) ((void) V)

struct aeEventLoop;

/* Types and data structures */
// 事件处理函数
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);

/* File event structure */
/* 文件事件的结构体 */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE)事件类型的掩码 */
aeFileProc *rfileProc; // AE_READABLE事件的处理函数指针
aeFileProc *wfileProc; // AE_WRITABLE事件的处理函数指针
void *clientData; // 客户端数据
} aeFileEvent;

/* Time event structure */
/* 时间事件的结构体 */
typedef struct aeTimeEvent {
long long id; // 服务器为时间事件创建的全局唯一ID
long when_sec;
long when_ms; // when_sec和when_ms记录时间事件到达的时间
aeTimeProc *timeProc; // 处理时间事件的函数指针
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next; // 下一个时间事件处理的函数指针
// 时间事件的结构体构成了一个链表结构
} aeTimeEvent;

/* A fired event */
/* 已就绪事件 */
typedef struct aeFiredEvent {
int fd;
int mask; // 事件类型
} aeFiredEvent;

/* State of an event based program */
/* 事件驱动架构的结构体 */
typedef struct aeEventLoop {
int maxfd; // 当前最大的监听文件的副套接字
int setsize; // 监听文件的副套接字数量上限
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events,结构体数组的指针 */
aeFiredEvent *fired; /* Fired events,结构体数组的指针 */
aeTimeEvent *timeEventHead; // 时间事件链表的头结点
int stop;
void *apidata; /* This is used for polling API specific data,包含了select实例对象(要监控的fd)*/
aeBeforeSleepProc *beforesleep; // 进入事件循环流程前执行的函数
} aeEventLoop;

/* Prototypes */
// 事件循环结构体的操作
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);

// 文件事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);

// 时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

// 事件驱动
int aeProcessEvents(aeEventLoop *eventLoop, int flags);

// 阻塞等待milliseconds后,直到fd可操作
int aeWait(int fd, int mask, long long milliseconds);

// 入口函数
void aeMain(aeEventLoop *eventLoop);

// 返回IO复用的类型
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);

// 设置可监听套接字的数量
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

#endif

7.1.2 ae.c

监听套接字(副套接字)上的文件读写事件是否发生,本质上是监听套接字上的读写是否准备就绪。前者的实现依赖于后者,后者的实现依赖于IO多路复用技术,例如selectepoll等,以select为例:

  • ae.c/aeCreateFileEvent依赖于ae_select.c/aeApiAddEvent,后者实际上就是FD_SET
  • ae.c/aeDeleteFileEvent依赖于ae_select.c/aeApiDelEvent,后者使用了FD_CLR来清除fd上的读写监听。
  • ae.c/aeProcessEvents依赖于ae_select.c/aeApiPoll,后者使用了select调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

/**
* 创建事件循环结构体
* setzise: 需要监控的文件套接字fd的个数
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 分配内存,其中zmalloc基于malloc
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
// 下面这一步确定了底层采用的IO复用的形式,例如select
// 在select的aeApiCreate中,初始化了要监控的fd,并赋值给eventLoop->apidata
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
// 把所有网络IO事件对应文件描述符的监听事件掩码,初始化为 AE_NONE,暂时不对任何事件进行监听
// 即对setsize个的fd上发生的读写事件均不进行监听
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err: // 错误处理
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

/* Return the current set size. */
int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize;
}

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
aeApiFree(eventLoop);
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}

/**
* 创建文件事件:将给定的fd以及需要监听的mask事件加入到eventLoop中
* fd: 副套接字
* mask: 监听事件类型掩码
* proc: 事件的处理函数
* clientData: 客户端对象,一般为redisClient
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 取得下标为fd的aeFileEvent结构体
aeFileEvent *fe = &eventLoop->events[fd];
// 为fd添加mask类型的监听事件(select中的底层实现)
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// fe->mask是原来的监听事件 mask是传入的想要监听事件
// 这里做或操作相当于合并监听事件类型
fe->mask |= mask;
// 如果mask中有AE_READABLE,则设置读事件的处理函数,下同
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) // 更新最大fd
eventLoop->maxfd = fd;
return AE_OK;
}

/**
* 删除fd上mask类型的监听事件
*/
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (fd >= eventLoop->setsize) return; // 无效fd
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return; // 没有监听事件,返回
// aeApiDelEvent中执行了FD_SLR
aeApiDelEvent(eventLoop, fd, mask);
fe->mask = fe->mask & (~mask);
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
/* Update the max fd */
int j;

for (j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}

/**
* 获取fd的监听事件类型
*/
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
if (fd >= eventLoop->setsize) return 0;
aeFileEvent *fe = &eventLoop->events[fd];

return fe->mask;
}

/**
* 事件的调度与执行
* 首先处理时间事件,然后处理文件事件
* 如果标志为0,则函数不执行任何操作并返回。
* 如果标志设置了AE_ALL_EVENTS,则处理所有类型的事件。
* 如果标志已设置AE_FILE_EVENTS,则处理文件事件。
* 如果标志设置了AE_TIME_EVENTS,则处理时间事件。
* 如果标志已设置AE_DONT_WAIT,则函数返回ASAP,直到所有不需要等待就可以处理的事件被处理
* 函数返回处理的事件数
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
// ...

// aeApiPoll返回出现的事件数
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask; // 发生的事件
int fd = eventLoop->fired[j].fd;
int rfired = 0;

// 先处理读事件,再处理写事件
// 监听的事件 & 发生的事件 & 读事件
if (fe->mask & mask & AE_READABLE) { // 处理读事件
rfired = 1;
// 调用处理读事件的函数
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) { // 处理写事件
if (!rfired || fe->wfileProc != fe->rfileProc)
// 调用处理写事件的函数
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

/**
* 处理事件入口
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) { // 循环
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop); // 处理事件之前的工作
aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 处理事件
}
}

7.1.3 ae_select.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
typedef struct aeApiState {
fd_set rfds, wfds; // 分别对应读和写的文件描述符
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds; // 上面的副本
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
// 申请空间,zmalloc基于malloc
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;
// 清空输入文件描述符集合
FD_ZERO(&state->rfds);
// 清空输出文件描述符集合
FD_ZERO(&state->wfds);
// state保存了需要监控的读和写的文件描述符,赋值给apidata
eventLoop->apidata = state;
return 0;
}

static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
/* Just ensure we have enough room in the fd_set type. */
if (setsize >= FD_SETSIZE) return -1;
return 0;
}

static void aeApiFree(aeEventLoop *eventLoop) {
zfree(eventLoop->apidata);
}

/**
* 创建文件事件:将给定的fd以及需要监听的mask事件加入到eventLoop中
*/
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
// 如果要监控该fd是否可读了,则添加fd进rfds集合中
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
// 如果要监控该fd是否可写了,则添加fd进wfds集合中
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}

/**
* 删除fd上mask类型的监听事件
*/
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;

if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
}

/**
* 监视fd是否准备就绪,调用了select
*/
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
// 保存原始的待监控集合到_rfds, _wfds中
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
// 在此处阻塞,直到fd准备就绪,tvp是等待超时时间,超时就返回
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) { // 成功返回,说明某些fd上的读或写就绪
// 遍历,查看是哪一个fd就绪
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0; // 记录发生了哪些事件
// 取得fd对应的文件事件结构体
aeFileEvent *fe = &eventLoop->events[j];

if (fe->mask == AE_NONE) continue; // 没有监听,返回
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) // 监听的是读事件,并且fd在读就绪中
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) // 监听的是写事件,并且fd在写就绪中
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j; // 记录就绪的fd到fired数组中
eventLoop->fired[numevents].mask = mask; // 记录就绪fd的监控类型
numevents++;
}
}
// 返回就绪事件个数(可能为读,写,读写)
return numevents;
}

/**
* 返回IO多路转接类型
*/
static char *aeApiName(void) {
return "select";
}

7.1.4 networking.c

Redis 将客户端的创建、消息回复等功能,实现在了 networking.c 文件中。Redis 对 TCP 网络通信的 Socket 连接、设置等操作进行了封装,这些封装后的函数实现在 anet.h/anet.c 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/* networking.c */

/**
* 根据accept得到的副套接字fd来创建客户端
*/
redisClient *createClient(int fd) {
// 分配空间
redisClient *c = zmalloc(sizeof(redisClient));
if(fd != -1) {
// 对连接的特性进行设置
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if(server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 创建文件事件:给fd绑定读事件,且处理函数为readQueryFromClient,即如果如果该客户端向服务器端发送数据,则aeEvent模块监听到这个事件就会调用readQueryFromClient函数。
if(aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient,c) == AE_ERR) {
close(fd);
zfree(c);
return NULL;
}
}
// 设置客户端对象的信息...大部分略
c->id = server.next_client_id++;
c->fd = fd;
// ...
return c;
}
#define MAX_ACCEPTS_PER_CALL 1000 // 最大连接数
// 根据副套接字创建一个客户端对象
static void acceptCommonHandler(int fd, int flags) {
redisClient *c; // 客户端对象
// 根据副套接字创建一个客户端对象
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";

/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
// 服务器端的连接数加一
server.stat_numconnections++;
c->flags |= flags;
}

/**
* 接收客户端连接,对accept的封装
*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// cport客户端端口,cfd客户端的套接字,max为这次调用可产生的最大连接数
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
// 客户端ip
char cip[REDIS_IP_STR_LEN];
// 避免编译器警告
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);

while(max--) {
// anetTcpAccept又是accept的封装
// server是一个全局变量,定义在redis.c中,定义为struct redisServer server;,结构体包含服务器端的众多信息
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) { // 错误处理
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 根据副套接字创建一个客户端
acceptCommonHandler(cfd,0);
}
}

上面是连接应答处理器的代码,即服务器端的accept的封装,这里调用了anet.c/anetTcpAccept,而anet.c/anetTcpAccept又调用了anet.c/anetGenericAcceptaccept调用就在最后一个函数中,相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 原生的accept封装
* err: 错误信息
* s: 监听套接字
* sa: 客户端地址,被强转为通用类型struct sockaddr
* len: sa的长度
* 返回副套接字
*/
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) { // 错误处理
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}

/**
* 接收连接,本质是对accept的封装
* err: 错误信息
* s: 监听套接字
* ip: 待回填的客户端ip地址
* ip_len: ip长度
* port: 待回填的客户端端口
* 返回副套接字
*/
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
// anetGenericAccept是accept的封装
// 对端(客户端)的地址回填在sa中
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;

// 对客户端的协议族的处理
if (sa.ss_family == AF_INET) { // 对端使用ipv4协议
struct sockaddr_in *s = (struct sockaddr_in *)&sa; // 将sockaddr_storage强转为sockaddr_in
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len); // 将对端ip地址保存在ip中
if (port) *port = ntohs(s->sin_port); // 将对端端口保存在port中
} else { // 对端使用ipv6协议
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
// 返回副套接字
return fd;
}

其中,地址的表示使用了sockaddr_storage,它在<netinet/in.h>头文件中定义:是升级版的sockaddr

1
2
3
4
struct sockaddr_storage {
uint8_t ss_len;
sa_family_t ss_family;
};

这是一般结构,实际使用时要根据协议族强转为对应协议的地址类型。

7.2 客户端

redis客户端是不定的,可以使用redis源码自带的客户端,也可以使用外部的客户端。但底层调用了connect函数来与redis服务器连接。redis服务器使用一些结构体来存储客户端的属性和客户端发送来的数据,需要一些函数来操作客户端相关的资源。

7.2.1 客户端属性

结构体定义在redis.h/redisClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
typedef struct redisClient {
uint64_t id; // 客户端自增ID
int fd; // 客户端套接字描述符,这里的是服务器accept之后产生的副套接字
redisDb *db; // 客户端正在使用的数据库的指针
int dictid; // 数据库号码
robj *name; // 客户端名称
sds querybuf; // 输入缓冲区(接收客户端传来的数据)
size_t querybuf_peak; // 最近100ms or more的输入缓冲区的最大值
int argc; // 将querybuf中的内容分析后,存储命令参数的个数,即argv的长度
robj **argv; // 类似于main函数的参数,argv[0]是命令,argv[1]之后为命令参数
struct redisCommand *cmd, *lastcmd; // 客户端当前要和上次执行的命令
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
list *reply; // 输出缓冲区,链表存储
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; // 客户端创建的时间
time_t lastinteraction; // 客户端上次活动时间
time_t obuf_soft_limit_reached_time;
int flags; // 标志位
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */

/* Response buffer */
int bufpos; // 记录了buf数组目前已使用的字节数量
char buf[REDIS_REPLY_CHUNK_BYTES]; // 输出缓冲区,缓存向客户端发送的数据
} redisClient;

7.2.2 创建客户端

7.3 服务器

7.3.1 发送数据