io_uring in a nutshell

There are two important operations for asynchronous requests for io_uring: committing the request, and completing the submitted request.

For IO event submission, the application is the producer and the kernel is the consumer, while for completion events, the kernel is the producer and the application is the consumer. Therefore, we need a pair of rings to provide a high-performance channel for communication between the kernel and the application. These rings are the core of the new interface: io_uring, named submission queue(SQ) , completion queue(CQ) , and these two data structures construct the basis of the new interface. These two data structures construct the basis of the new interface.

The data structure of io_uring

First let’s look at the definition of the (completion queue event)CQE data structure.

1
2
3
4
5
struct io_uring_cqe {
    __u64 user_data;
    __s32 res;
    __u32 flags;
}

First of all io_uring_cqe has a user_data field that is carried by the original commit request and can carry any information that indicates which request it is, the most basic use is to use the pointer to the original request, the kernel will not modify this field, it simply moves directly from the commit event to the completion event. res points to the result returned by this commit event, just like the result returned by the system call. The flags field will carry metadata that depends on the operation, but this field is not used yet.

For the submission queue event(SQE) the structure definition is more complex.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
struct io_uring_sqe {
   __u8 opcode;
   __u8 flags;
   __u16 ioprio;
   __s32 fd;
   __u64 off;
   __u64 addr;
   __u32 len;
   union {
   	__kernel_rwf_t rw_flags;
   	__u32 fsync_flags;
   	__u16 poll_events;
	__u32 sync_range_flags;
	__u32 msg_flags;   
   };
   __u64 user_data;
   union {
   	__u16 buf_index;
   	__u64 __pad2[3];
   }; 
};

The opcode field is used to describe the opcode for a submitted request, for example IORING_OP_READV for a read request. flags contains modifier flags that are common in command types. ioprio is used to indicate the priority of the request, which for a normal read or write request will follow the definition of the ioprio_set system call. fd is the file descriptor associated with the request, off indicates the offset at which the operation will start, and addr contains the address at which the kernel will start the IO operation. For non-vectored IO transfers, addr must contain the address directly. In the case of non-vectored IO, len is carried directly, and in the case of vectored IO, a number of vectors (described by addr) are carried.

Next is a union to describe a specific opcode. For example, for vectored read ( IORING_OP_READV ), these flags should be the same as the flags for the preadv2(2) system call. user_data is transferred in by the user and will not be accessed or modified by the kernel. buf_index is described in the advanced use case, and the final pad is used for data structure padding and is used as a 64-bit alignment.

(Note: vectored IO is a form of IO that sequentially reads data from multiple buffer and writes to one stream through a single producer; or reads data from one buffer and writes to multiple streams, and is used to read and write multiple non-contiguous buffers in a single function call.)

io_uring communication

With the data structure of io_uring understood, let’s look at the details of how io_uring works.

The CQEs are organized in an array whose memory is visible and modifiable for both the kernel and the application. However, since CQEs are generated by the kernel, only the kernel is actually modifying CQEs. The method of communication is managed using a ring buffer. Whenever the kernel posts a new event to the CQ ring, it updates the tail associated with it. when the application uses an entry, it updates the head. so if the tail is different from the head, the application knows that it has one or more events available for consumption. The ring counter itself is a free-flowing 32-bit integer that relies on natural packing when the number of completed events exceeds the ring’s capacity. One advantage of this approach is that we can take advantage of the full size of the ring without having to manage the “ring full” flag on the side, which can complicate ring management. With this, the size of the ring must be a power of 2.

In order to find the index of an event, the application must add a mask to the current tail index, as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  unsigned head;
  head = cqringhead;
read_barrier();
  if (head != cqringtail) {
     struct io_uring_cqe *cqe;
     unsigned index;
    index = head & (cqringmask);
    cqe = &cqringcqes[index];
    /* process completed cqe here */
    ...
    /* we've now consumed this entry */
     head++;
  }
cqringhead = head;
  write_barrier();

ring->cqes[] is a shared array of io_uring_cqe structures. Later, we will describe how shared memory is started and managed.

The rules are still preserved for the commit event end. The application updates the tail while the kernel consumes the head. An important difference is that CQ rings directly index the shared memory of CQEs, and the commit side has an array of indirection in them, so the ring buffer on the commit side is accessed directly from the array via index.

An example is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct io_uring_sqe *sqe;
unsigned tail, index;
tail = sqringtail;
index = tail & (*sqringring_mask);
sqe = &sqringsqes[index];
/* this call fills in the sqe entries for this IO */
init_io(sqe);
/* fill the sqe index into the SQ ring array */
sqringarray[index] = index;
tail++;
write_barrier();
sqringtail = tail;
write_barrier();

Completion events may be reached in any order, the order of requests and the order of completions are not linked in any way, and the SQ ring and CQ ring operate independently. However, a completion event will always fit with a requested event. Thus, a completed event will always be associated with a specific submission request.

io_uring interface

Like aio, io_uring has a number of system calls, the first of which is used to start the io_uring instance.

int io_uring_setup(unsigned entries, struct io_uring_params *params);

The application must provide the number of entries specified by the io_uring instance, where entries indicates the number of SQEs, which must be a power of 2, in 1…4096, and the params structure is defined as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
struct io_uring_params {
    __u32 sq_entries;
    __u32 cq_entries;
    __u32 flags;
    __u32 sq_thread_cpu;
    __u32 sq_thread_idle;
    __u32 resv[5];
    struct io_sqring_offsets sq_off;
    struct io_cqring_offsets cq_off; 
};

sq_entries will be populated by the kernel to let the application know how many SQE entries are supported by the current ring. also for cqe_entries the application is informed how big the CQ ring is.

When the interface is successfully called, the kernel will return a file descriptor pointing to this io_uring instance. This is where sq_off and cq_off come in handy. Since SQE and CQE need to be accessed by both the kernel and the user, the application must know if it reaches this memory, which should be mapped into the application’s memory space using mmap(). The application uses sq_off to specify the offsets of the different ring members, and the io_sqring_offsets structure is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct io_sqring_offsets {
    __u32 head; /* offset of ring head */
    __u32 tail; /* offset of ring tail */
    __u32 ring_mask; /* ring mask value */
    __u32 ring_entries; /* entries in ring */
    __u32 flags; /* ring flags */
    __u32 dropped; /* number of sqes not submitted */
    __u32 array; /* sqe index array /
	__u32 resv1;
	__u64 resv2;
};

To access this memory, the application must use mmap to pass the file descriptor of io_uring and the memory offset associated with the SQ ring. The io_uring API defines the following mmap offset so that it can be used by the application.

1
2
3
#define IORING_OFF_SQ_RING 0ULL 
#define IORING_OFF_CQ_RING 0x8000000ULL 
#define IORING_OFF_SQES 0x10000000ULL

IORING_OFF_SQ_RING is used to map SQ rings into user memory space, IORING_OFF_CQ_RING is used for CQ rings, and IORING_OFF_SQES is used to map sqe arrays, which for arrays of CQEs are part of their rings. Since the SQ ring is an index of the SQE’s array, the application must map the SQE array separately.

The application will need to define its own data structures to obtain these offsets, a possible example of which is shown below.

1
2
3
4
5
6
7
8
9
struct app_sq_ring {
	unsigned *head;
    unsigned *tail;
	unsigned *ring_mask;
    unsigned *ring_entries;
	unsigned *flags;
   	unsigned *dropped;
	unsigned *array; 
};

An example of starting io_uring is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
struct app_sq_ring app_setup_sq_ring(int ring_fd, struct io_uring_params *p) {
	struct app_sq_ring sqring;
	void *ptr;
    ptr = mmap(NULL, psq_off.array + psq_entries * sizeof(__u32),
    PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
    ring_fd, IORING_OFF_SQ_RING);
    sringhead = ptr + psq_off.head;
    sringtail = ptr + psq_off.tail;
    sringring_mask = ptr + psq_off.ring_mask;
    sringring_entries = ptr + psq_off.ring_entries;
    sringflags = ptr + psq_off.flags;
    sringdropped = ptr + psq_off.dropped;
    sringarray = ptr + psq_off.array;
    return sring; 
}

The application also needs a way to inform the kernel which requests need to be consumed now, and this will be done with the following system call.

1
int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t sig);

fd points to the io_uring file descriptor, to_submit informs the kernel how many sqes need to be consumed and committed, and min_complete informs the kernel to wait for that number of requests to complete

memory sequence

Pending updates…

liburing libraries

Using system calls directly is not user-friendly, so the kernel developers provide a user library of io_uring for users.

1
2
struct io_uring ring;
io_uring_queue_init(ENTRIES, &ring, 0);

By using io_uring_queue_init we can start an instance of io_uring without having to use io_uring_setup and then call mmap() ,

When we are done with the instance we can call the following system call to destroy it.

1
io_uring_queue_exit(&ring);

An example of using liburing is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
struct io_uring_sqe sqe;
    struct io_uring_cqe cqe;
   /* get an sqe and fill in a READV operation */
	sqe = io_uring_get_sqe(&ring);
	io_uring_prep_readv(sqe, fd, &iovec, 1, offset);
   /* tell the kernel we have an sqe ready for consumption */
	io_uring_submit(&ring);
   /* wait for the sqe to complete */
	io_uring_wait_cqe(&ring, &cqe);
  /* read and process cqe event */
	app_handle_cqe(cqe);
	io_uring_cqe_seen(&ring, cqe);

Advanced Usage and Features

To be updated…

More examples

A chat server written with io_uring looks like this.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
use io_uring::{IoUring, SubmissionQueue, opcode, squeue, types};
use slab::Slab;

use std::collections::VecDeque;
use std::net::TcpListener;
use std::os::unix::io::{ AsRawFd, RawFd };
use std::{ io, ptr };

#[derive(Clone, Debug)]
enum Token {
    Accept, 
    Poll {
        fd: RawFd
    },
    Read {
        fd: RawFd,
        buf_index: usize
    },
    Write {
        fd: RawFd, 
        buf_index: usize, 
        offset: usize,
        len: usize
    }
}

pub struct AcceptCount {
    entry: squeue::Entry,
    count: usize
}

impl AcceptCount {
    /// 新建 AcceptCount 结构体,fd 表示监听的文件描述符,token 表示 sqe 携带的用户数据
    /// count 表示该文件描述符所能接收到的最大连接
    fn new(fd: RawFd, token: usize, count: usize) -> Self {
        Self {
            entry: opcode::Accept::new(types::Fd(fd), ptr::null_mut(), ptr::null_mut())
                    .build()
                    .user_data(token as _),
            count
        }
    }

    /// 向提交队列中提交事件
    pub fn push_to(&mut self, sq: &mut SubmissionQueue<'_>) {
        while self.count > 0 {
            unsafe{
                match sq.push(&self.entry) {
                    Ok(_) => self.count -= 1,
                    Err(_) => break,
                }
            }
        }
        sq.sync();
    }
}
fn main() {
    let mut ring = IoUring::new(256).unwrap();
    let listener = TcpListener::bind(("127.0.0.1", 8080)).unwrap();

    // 用于存放提交失败的事件
    let mut backlog = VecDeque::new();
    // 用于存放空闲的缓冲区的 buf_index,一般为关闭连接的socket被回收的
    let mut bufpool = Vec::with_capacity(64);
    // 用来存储内存中的缓冲区的指针,使用 buf_index 进行访问
    let mut buf_alloc = Slab::with_capacity(64);
    // 一段用来存放不同事件token的内存区域,通过token_index获取到事件类型及信息
    let mut token_alloc = Slab::with_capacity(64);

    // 用来存放所有建立连接的 sockets
    let mut sockets = Vec::new();

    println!("Server listen on {}", listener.local_addr().unwrap());

    // 从 io_uring 实例中获取提交者,提交队列,完成队列
    let (submitter, mut sq, mut cq) = ring.split();

    // 建立 AcceptCount,用于计算监听的文件描述符并提交事件
    let mut accept = AcceptCount::new(listener.as_raw_fd(), token_alloc.insert(Token::Accept), 10);
    accept.push_to(&mut sq); 

    loop {
        // 提交SQ里的所有队列,等待至少一个事件成功返回
        match submitter.submit_and_wait(1) {
            Ok(_) => (),
            Err(ref err) => if err.raw_os_error() == Some(libc::EBUSY) { break; },
            Err(err) => panic!(err)
        }
        // 同步完成队列,刷新在内核中的CQEs
        cq.sync();

        loop {
            if sq.is_full() {
                // 提交队列满了的时候提交所有任务到内核
                match submitter.submit() {
                    Ok(_) => (),
                    Err(ref err) => if err.raw_os_error() == Some(libc::EBUSY) {break;},
                    Err(err) => panic!(err)
                }
            }
            // 同步提交队列的内容
            sq.sync();

            match backlog.pop_front() {
                Some(sqe) => unsafe {
                    // 向SQ中提交事件(此时没有被提交到内核中)
                    let _ = sq.push(&sqe);
                },

                None => break,
            }
        }

        accept.push_to(&mut sq);

        for cqe in &mut cq {
            // 遍历完成队列的内容
            // 获取 CQE 的结果
            let ret = cqe.result();
            // 获取 CQE 的用户数据(用于判断是什么事件)
            let token_index = cqe.user_data() as usize;

            if ret < 0  {
                // 表明该事件执行失败了
                eprintln!(
                    "token {:?} error: {:?}",
                    token_alloc.get(token_index),
                    io::Error::from_raw_os_error(-ret)
                );
                continue;
            }

            // 通过传入的用户数据取出对应的 token 用于判断是什么事件
            let token = &mut token_alloc[token_index];
            match token.clone() {
                Token::Accept => {
                    // 当接收到客户端连接时,将 accept 的 count 域进行迭代
                    accept.count += 1;
                    // 此时收到的结果是一个文件描述符,表示的是接收到连接的socket
                    let fd = ret;
                    // 将文件描述符push到sockets中
                    sockets.push(fd);
                    // 此时向分配 token_alloc 中插入Token获取token用于作为 user_data
                    let poll_token = token_alloc.insert(Token::Poll{ fd });
                    // 创建poll实例,不断轮询检测是否从该socket中收到信息
                    let poll_e = opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _)
                                        .build()
                                        .user_data(poll_token as _);
                    unsafe{
                        if sq.push(&poll_e).is_err() {
                            // 如果没有提交到提交队列中(此时应当是提交队列已满),则将其放入backlog中,等待下一次提交
                            backlog.push_back(poll_e);
                        }
                    }
                }

                Token::Poll { fd } => {
                    let (buf_index, buf) = match bufpool.pop() {
                        Some(buf_index) => (buf_index, &mut buf_alloc[buf_index]),
                        None => {
                            // 新建一个缓冲区
                            let buf = vec![0u8; 2048].into_boxed_slice();
                            // 返回一个空条目的 handle,允许进一步进行操作
                            let buf_entry = buf_alloc.vacant_entry();
                            // 获取该 handle 的key(index)
                            let buf_index = buf_entry.key();
                            // 返回索引和将缓冲区插入 entry中
                            (buf_index, buf_entry.insert(buf))
                        }
                    };

                    *token = Token::Read { fd, buf_index };

                    // 当 Poll 事件返回后表明有一个可读事件发生,此时应当注册读取事件,并将
                    // 该事件 push 到提交队列中
                    let read_e = opcode::Recv::new(types::Fd(fd), buf.as_mut_ptr(), buf.len() as _)
                                        .build()
                                        .user_data(token_index as _);

                    unsafe {
                        if sq.push(&read_e).is_err() {
                            backlog.push_back(read_e);
                        }
                    }
                }

                Token::Read { fd, buf_index} => {
                    // 读取事件返回,表明从连接的socket中读取到了传输来的信息
                    if ret == 0 {
                        // 结果为0,表明对方关闭了连接
                        // 此时这个缓冲区就没有用了,将其push
                        // 到 bufpool,用于下一次read/write事件
                        // 作为缓冲区
                        bufpool.push(buf_index);
                        // 将token_index从token_alloc移除掉
                        token_alloc.remove(token_index);

                        println!("shutdown");

                        for i in 0..sockets.len() {
                            if sockets[i] == fd {
                                sockets.remove(i);
                            }
                        }

                        unsafe {
                            libc::close(fd);
                        }
                    }else {
                        // 读取成功,此时的结果表明读取的字节数
                        let len = ret as usize;
                        // 获取用来获取 read 的缓冲区
                        let buf = &buf_alloc[buf_index];

                        let socket_len = sockets.len();
                        token_alloc.remove(token_index);
                        for i in 0..socket_len {
                            // 新建write_token并将其传输给所有正在连接的socket
                            let write_token = Token::Write {
                                fd: sockets[i], 
                                buf_index,
                                len,
                                offset: 0
                            };

                            let write_token_index = token_alloc.insert(write_token);

                            // 注册 write 事件,实际上是注册 send syscall 的事件
                            let write_e = opcode::Send::new(types::Fd(sockets[i]), buf.as_ptr(), len as _)
                                                .build()
                                                .user_data(write_token_index as _);
                            unsafe {
                                if sq.push(&write_e).is_err() {
                                    backlog.push_back(write_e);
                                }
                            }
                        }

                    }
                }

                Token::Write {
                    fd,
                    buf_index,
                    offset,
                    len
                } => {
                    // write(send) 事件返回,此时的结果是写字节数
                    let write_len = ret as usize; 

                    // 如果写偏移量的写数据的字节数大于等于要写的长度,
                    // 此时表明已经写完,则开始注册等待事件继续轮询socket是否传输信息
                    let entry = if offset + write_len >= len {
                        bufpool.push(buf_index);

                        *token = Token::Poll { fd };

                        opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _)
                                .build()
                                .user_data(token_index as _)
                    }else {
                        // 如果没写完的话则更新参数重新写
                        // 将写偏移量加上写字节数
                        let offset = offset + write_len;
                        // 将要写的数据长度减去偏移量
                        let len = len - offset;
                        // 通过偏移量获取缓冲区的指针
                        let buf = &buf_alloc[buf_index][offset..];

                        *token = Token::Write {
                            fd, 
                            buf_index,
                            offset, 
                            len
                        };

                        opcode::Write::new(types::Fd(fd), buf.as_ptr(), len as _)
                                    .build()
                                    .user_data(token_index as _)
                    };

                    unsafe {
                        if sq.push(&entry).is_err() {
                            // 将事件push到提交队列中,失败了则放入到备份中
                            backlog.push_back(entry);
                        }
                    }
                }
            }
        }
    }
}