I feel that this part of the code is still quite interesting, and I will try to explain it in a more general way.

Overview

I remember my article on the Go HTTP Standard Library, which explains how to create a Server-side application for Go.

  • First, register the processor.
  • Start a loop to listen for ports, and create a Goroutine for each connection.
  • Then the Goroutine waits in a loop to receive the request data, then matches the corresponding processor in the processor routing table based on the request address, and hands the request to the processor for processing.

In code, it looks like this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (srv *Server) Serve(l net.Listener) error { 
    ...
    baseCtx := context.Background()  
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    for {
        // 接收 listener 过来的网络连接
        rw, err := l.Accept()
        ... 
        tempDelay = 0
        c := srv.newConn(rw)
        c.setState(c.rwc, StateNew) 
        // 创建协程处理连接
        go c.serve(connCtx)
    }
}

Redis is a bit different because it is single-threaded and cannot use multiple threads to handle connections, so Redis chooses to use an event driver based on the Reactor pattern to implement concurrent processing of events.

Reactor

The so-called Reactor mode in Redis is to listen to multiple fd’s through epoll, and whenever these fd’s respond, they will notify epoll in the form of events for callbacks, each of which has a corresponding event handler.

Each event has a corresponding event handler, such as accept for acceptTCPHandler, read & write for readQueryFromClient, and so on, and then the events are assigned to event handlers for processing in the form of circular dispatching of events.

So the above Reactor pattern is implemented through epoll, and there are three main methods for epoll.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_create(int size)

/*
 * 可以理解为,增删改 fd 需要监听的事件
 * epfd 是 epoll_create() 创建的句柄。
 * op 表示 增删改
 * epoll_event 表示需要监听的事件,Redis 只用到了可读,可写,错误,挂断 四个状态
 */
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

/*
 * 可以理解为查询符合条件的事件
 * epfd 是 epoll_create() 创建的句柄。
 * epoll_event 用来存放从内核得到事件的集合
 * maxevents 获取的最大事件数
 * timeout 等待超时时间
 */
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

So we can implement a simple server based on these three methods.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 创建监听
int listenfd = ::socket();

// 绑定ip和端口
int r = ::bind();  
// 创建 epoll 实例
int epollfd = epoll_create(xxx); 
// 添加epoll要监听的事件类型
int r = epoll_ctl(..., listenfd, ...);

struct epoll_event* alive_events =  static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event)));

while (true) {
    // 等待事件
    int num = epoll_wait(epollfd, alive_events, kMaxEvents, kEpollWaitTime);
    // 遍历事件,并进行事件处理
    for (int i = 0; i < num; ++i) {
        int fd = alive_events[i].data.fd;
        // 获取事件
        int events = alive_events[i].events;
        // 进行事件的分发
        if ( (events & EPOLLERR) || (events & EPOLLHUP) ) {
            ...
        } else  if (events & EPOLLRDHUP) {
            ...
        } 
        ...
    }   
}

Call Flow

So, based on the above, you can see that an event loop for Redis is just a few steps.

  1. registering event listeners and callback functions.
  2. wait in the loop to get the event and process it.
  3. call the callback function to process the data logic.
  4. write back the data to the Client.

Call Flow

  1. register fd to epoll and set the callback function acceptTcpHandler, which will be called if there is a new connection.
  2. start a dead loop calling epoll_wait to wait and keep processing events, later we will return to the aeMain function to loop through the aeProcessEvents function.
  3. when a network event comes, it will follow the callback function acceptTcpHandler all the way to readQueryFromClient for data processing. readQueryFromClient will parse the client’s data and find the corresponding cmd function to execute;
  4. the Redis instance, after receiving the client request, will write the data to be returned to the client output buffer instead of returning it immediately after processing the client command.
  5. the beforeSleep function is then called at each loop of the aeMain function to write the data in the buffer back to the client; the beforeSleep function is then called at each loop of the aeMain function to write the data in the buffer back to the client.

The entire process of the event loop above has actually been written in very clear code steps, and there are many articles on the Internet about it, so I won’t say much.

Command execution process & write back to client

Command Execution

Let’s talk about something that many articles on the web don’t mention, and see how Redis executes commands, stores them in the cache, and writes the data back to the Client from the cache.

Command execution process

As we mentioned in the previous section, the readQueryFromClient function is called if a network event comes through, and it is where the command is actually executed. We’ll also follow this method all the way down to.

  1. readQueryFromClient will call the processInputBufferAndReplicate function to process the requested command.
  2. the processInputBufferAndReplicate function calls processInputBuffer and determines whether the command needs to be replicated to other nodes if it is in cluster mode.
  3. the processInputBuffer function will loop through the requested commands, call the processInlineBuffer function according to the requested protocol, and call the processCommand to execute the commands after the redisObject object.
  4. processCommand will lookupCommand to the server.commands table to find the corresponding execution function according to the command, then after a series of checks, call the corresponding function to execute the command, and call addReply to write the data to be returned to the client output buffer.

server.commands registers all Redis commands in the populateCommandTable function as a table of command functions based on command names.

For example, to execute the get command, the getCommand function would be called.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
void getCommand(client *c) {
    getGenericCommand(c);
}

int getGenericCommand(client *c) {
    robj *o;
    // 查找数据
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;
    ...
}

robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
    //到db中查找数据
    robj *o = lookupKeyRead(c->db, key);
    // 写入到缓存中
    if (!o) addReply(c,reply);
    return o;
}

Find the data in the getCommand function and call addReply to write the data to be returned to the client’s output buffer.

Writing data back to Client

After the above command is written to the buffer, the data needs to be retrieved from the buffer and returned to the Client, which is actually done in the event loop of the server.

Writing data back to Client

  1. first Redis will call the aeSetBeforeSleepProc function in the main function to register the beforeSleep function of the writeback package to the eventLoop; 2. then Redis will call the aeMain function for the event loop to see if beforesleep has been set.
  2. then Redis will determine if beforesleep has been set when calling the aeMain function for the event loop, and if it has, then it will call it.
  3. the beforesleep function will call the handleClientsWithPendingWrites function, which will call writeToClient to write the data back to the client from the buffer.

Summary

This article describes how the entire Redis request processing model really works. From registering to listen for fd events, to executing commands, to finally writing data back to the client, I’ve done a general analysis. Of course, this article is also a bit different from my previous articles, I didn’t post the code at length, mainly because I don’t think it’s necessary, but you can follow the flowchart to see the code.