从redis原理的角度认知Set命令的执行过程

说明:

  1. 此次案例采用的redis是cluster模式。
  2. 网络模型采用 epoll 模式

本篇文章主要讲解 ,从redis原理的角度了解一个 set 命令从redis client发出到 redis server端接收到客户端请求的时候,到底经历了哪些过程?

同样会附带了解下面几个问题

  1. redis的执行原理
  2. Redis cluster集群模式的运行原理
  3. 同样解释了为什么redis的速度
  4. epoll网络模型

为了了解redis请求流程,首先先了解下redis的网络模型。redis 支持 4中网络模式, select、poll、epoll、kqueue ,其中epoll 模型我个人认为是应用最广泛的模型,所以本篇文章以epoll 模型为 demo 进行讲解。

Epoll网络模型

Select 和 poll 模型的缺点:

  1. 每次调用 Select 都需要将进程加入到所有监视 Socket 的等待队列,每次唤醒都需要从每个队列中移除,这里涉及了两次遍历,而且每次都要将整个 FDS 列表传递给内核,牵涉到用户态到内核态的转移,有一定的开销。
  2. select /poll 返回的值是 int 类型,使得我们不知道是那个 socket 准备就绪了,我们还需要新一轮的系统调用去检查哪一个准备就绪了。

Epoll 模型为了解决 Select ,Poll的两次轮训和每次都需要传入文件描述符的问题,对整体的结构做了一个新的优化,具体架构如下:

img

Epoll 启动具体流程如下:

  1. 在内核中开辟一个新的存储空间,存储文件描述符(红黑树结构),构建方法是 epoll_create()
  2. 使用 epoll_ctl 函数,对文件描述符进行CRUD的管理
  3. 使用 epoll_wait 函数阻塞线程调用,同样把调用线程放到等待队列中

Epoll 收到消息后处理流程:

不同于 select/poll 的中断和异常处理,Epoll 采用的是内核通过调度机制,将等待事件的线程从挂起状态移动到可运行状态。

在 epoll 的等待过程中,内核会监视所有被注册的文件描述符,一旦有文件描述符上发生了注册的事件,内核会将这个事件通知到 epoll 实例。具体流程如下:

  1. 调用 epoll_wait 的线程在 epoll 实例上等待事件的发生。这时线程被挂起,进入休眠状态。
  2. 当有文件描述符上发生了注册的事件,内核会将这个事件信息标记到 epoll 实例中。
  3. 一旦事件发生,内核会唤醒等待的线程。这是通过调度机制完成的,内核会将等待的线程移动到可运行状态。
  4. 等待的线程被唤醒后,epoll_wait 返回,并将事件的信息填充到用户提供的数组中,使用户程序得以处理发生的事件。

过程伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 用户空间代码
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
// 在内核中等待事件发生
wait_for_events(epfd, events, maxevents, timeout);

// 返回事件信息
return num_events;
}

// 内核空间代码
void wait_for_events(int epfd, struct epoll_event *events, int maxevents, int timeout) {
// 如果没有事件发生,将当前线程挂起
add_thread_to_wait_queue(current_thread, epfd->wait_queue);

// 进入调度器,切换到其他线程执行
schedule();

// 返回时,说明事件发生,处理事件
process_events(epfd, events, maxevents);
}

// 文件描述符事件发生时的处理
void handle_events(struct epoll_event *events, int num_events) {
// 遍历等待队列,唤醒等待的线程
wake_up_threads(epfd->wait_queue);
}

img

Redis server端启动

在了解完 epoll 模型的时候,那我们需要思考,在redis中是如何利用Epoll模型通信的。我们看下redis 核心启动的源码:

1
2
3
4
5
6
int main(int argc, char **argv) {
//...
initServer();
//...
aeMain(server.el);
}

redis在启动时,有两个主要的方法,initServer 和 aeMain,其中 initServer 会有以下和epoll相关的核心流程:

  1. aeCreateEventLoop 创建 epoll的文件监控文件描述符列表
  2. listenToPort 监听指定端口
  3. createSocketAcceptHandler 注册对应接收事件的handler
  4. aeSetBeaforeSleepProc 前置处理器

aeMain 函数循环调用 aeApiPoll (相当于 epoll_wait)等待 FD 就绪。总体流程如下:

img

命令发送和执行

Redis Cluser 集群模式

Redis 集群模式是常用的架构模式,其结构图如下:

img

在集群中 master 节点同步采用的 Gossip协议进行通信,保证集群内消息通信。

在 master 和 slave 同步采用定时发送数据完成。

经过上面的讨论,把Redis 相关的背景知识进行了梳理,下面开始看命令的流转。

客户端连接

当redis启动时候,Redis 已经注册了链接应答管理器(tcpAccepthandler),这个作用主要是把就绪的 fd 绑定到对应的处理器上面(readQueryFromClient),这样当FD有数据就是的时候,可以调用对应的处理器方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void initServer(void) {
//...
createSocketAcceptHandler(&server.ipfd, acceptTcpHandler);
//...
}

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//...
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
anetCloexec(cfd);
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}

}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
//...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
//...

}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
//调用 readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}

//...
}

当注册完成后,在aeMain方法中会调用 epoll_wait() 方法,具体代码流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags){

//...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);

/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
//...
}


// ae_epoll.c
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;

numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

命令执行

当在redis 客户端输入 set xxx aaa 这个命令后,会经历下面几个过程:

  1. 当 set 命令从客户端发出的时候,通过提前建立好的TCP链接,把数据发送到某一台服务器上
  2. 当前redis节点检测当前的这个key是否在自己服务的Hash槽中,如果不在则直接返回一个moved命令,客户端接收到moved命令,转移到指定正确的服务器中。
  3. 客户端把输入的命令解析和转化成 RESP协议 +SET xxx aaa\r\n
  4. 客户端把报文发送到 Redis 服务端,当 socket 变成可读的时候,epoll_wait 返回了就绪的fd个数
1
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,                    tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
  1. 循环遍历 fd 的个数,判断类型。此处这里是 EPOLLIN 事件,代表缓冲区已经可读,调用对应的函数(readQueryFromClient),具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void readQueryFromClient(connection *conn) {
//...
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
}

void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
//
//...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}

//。。。
}

int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
//..
}


int processCommand(client *c) {

//...
/**
* lookupCommand 查询对应的命令
**/
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//..

/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
}

}



struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
  1. 读取fd内容,并解析对应的命令 set ,查询对应的命令实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void *dictFetchValue(dict *d, const void *key) {
dictEntry *he;

he = dictFind(d,key);
return he ? dictGetVal(he) : NULL;
}

dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
uint64_t h, idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
int retval1, retval2;

/* Translate the command string flags description into an actual
* set of flags. */
if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
serverPanic("Unsupported command flag");

c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}

struct redisCommand redisCommandTable[] = {
...
/* Note that we can't flag set as fast, since it may perform an
* implicit DEL of a large key. */
{"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},

{"setnx",setnxCommand,3,
"write use-memory fast @string",
0,NULL,1,1,1,0,0,0},

{"setex",setexCommand,4,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},

...
};
  1. 选择对应的 set命令类,执行set命令
1
2
3
4
5
6
7
8
9
10
11
12
void setCommand(client *c) {
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_NO_FLAGS;

if (parseExtendedStringArgumentsOrReply(c,&flags,&unit,&expire,COMMAND_SET) != C_OK) {
return;
}

c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

生成响应

执行完命令后,实现函数会生成一个响应对象,并将其添加到客户端的输出缓冲区中。这个过程通常由 addReply 系列函数完成。 对于 SET 命令,实现函数可能会生成一个 “OK” 响应并添加到输出缓冲区中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

发送响应

当事件循环检测到输出缓冲区中有数据可以发送时,它会调用 writeToClient 函数将响应发送给客户端。

通过以上步骤,Redis 能够根据客户端发送的命令找到相应的实现函数并执行它,然后将结果发送回客户端。这个过程涉及到多个源码文件和函数,但主要逻辑在 commands.c 文件中完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
void beforeSleep(struct aeEventLoop *eventLoop) {
//...

/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
//...

}

int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */

/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}

/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();

/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;

/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}

int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}

/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);

/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);

/* Update processed count on server */
server.stat_io_writes_processed += processed;

return processed;
}

int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);

listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}

img