Typical Netty server side startup code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();

                            //入站编码处理器
                            pipeline.addLast(new MyByteToLongDecoder());
                            //出站的handler进行编码
                            pipeline.addLast(new MyLongToByteEncoder());
                            //自定义的handler 处理业务逻辑
                            pipeline.addLast(new MyServerHandler());

                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

Start-up flow analysis

Creation of NioEventLoopGroup

Two NioEventLoopGroups are created before starting the Netty server

So let’s first analyze the process of their creation.

The instantiation of NioEventLoopGroup ends up calling the constructor of its parent class.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                           EventExecutorChooserFactory chooserFactory, Object... args) {
       if (nThreads <= 0) {
           //参数合法性检测
           throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
       }

       if (executor == null) {
           //如果线程池为空,则创建一个线程池,
           //这个线程池非常的特殊,他为每个任务都单独创建一个任务
           executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
       }

        //这个children实际上是一个NioEventLoop数组
       children = new EventExecutor[nThreads];

       for (int i = 0; i < nThreads; i ++) {
           boolean success = false; //用于标记是否创建成功
           try {
               //这里的newChild实际是子类NioEventLoopGroup实现的
               children[i] = newChild(executor, args);
               success = true;
           } catch (Exception e) {
               // TODO: Think about if this is a good exception type
               throw new IllegalStateException("failed to create a child event loop", e);
           } finally {
               if (!success) {
                   //如果在创建的NioEventLoop数组数组中途出现了异常
                   //那么就将成功创建的NioEventLoop关闭掉
                   for (int j = 0; j < i; j ++) {
                       children[j].shutdownGracefully();
                   }

                   for (int j = 0; j < i; j ++) {
                       EventExecutor e = children[j];
                       try {
                           while (!e.isTerminated()) {
                               e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                           }
                       } catch (InterruptedException interrupted) {
                           // Let the caller handle the interruption.
                           Thread.currentThread().interrupt();
                           break;
                       }
                   }
               }
           }
       }

        //chooser实际上是每次进行相关操作时线程的选择的实现,默认使用的是轮询策略
       chooser = chooserFactory.newChooser(children);

       final FutureListener<Object> terminationListener = new FutureListener<Object>() {
           @Override
           public void operationComplete(Future<Object> future) throws Exception {
               if (terminatedChildren.incrementAndGet() == children.length) {
                   terminationFuture.setSuccess(null);
               }
           }
       };

       for (EventExecutor e: children) {
           e.terminationFuture().addListener(terminationListener);
       }

       Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
       Collections.addAll(childrenSet, children);
       readonlyChildren = Collections.unmodifiableSet(childrenSet);
   }

The implementation of newChild is as follows.

1
2
3
4
5
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
       EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
       return new NioEventLoop(this, executor, (SelectorProvider) args[0],
           ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

The above is the implementation of NioEventLoopGroup. When starting the Netty server, two NioEventLoopGroup are created, boosGroup and workerGroup, which are essentially the same, but have different roles.

ServerBootstrap and NioEventLoopGroup binding

1
2
3
4
5
6
7
8
9
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    ObjectUtil.checkNotNull(childGroup, "childGroup");
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

From here the roles of the two NioEventLoopGroups start to differ.

ServerBootstrap’s bind handling

The doBind method is finally called through a cascade of calls.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private ChannelFuture doBind(final SocketAddress localAddress) {
    //对Channel进行初始化和注册操作
       final ChannelFuture regFuture = initAndRegister();
       final Channel channel = regFuture.channel();
       if (regFuture.cause() != null) {
           return regFuture;
       }

       if (regFuture.isDone()) {
           // At this point we know that the registration was complete and successful.
           ChannelPromise promise = channel.newPromise();
           doBind0(regFuture, channel, localAddress, promise);
           return promise;
       } else {
           // Registration future is almost always fulfilled already, but just in case it's not.
           final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
           regFuture.addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture future) throws Exception {
                   Throwable cause = future.cause();
                   if (cause != null) {
                       // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                       // IllegalStateException once we try to access the EventLoop of the Channel.
                       promise.setFailure(cause);
                   } else {
                       // Registration was successful, so set the correct executor to use.
                       // See https://github.com/netty/netty/issues/2586
                       promise.registered();

                       doBind0(regFuture, channel, localAddress, promise);
                   }
               }
           });
           return promise;
       }
   }

Binding and initialization of the Channel is done at the beginning of the doBind.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //利用Channel工厂创建一个Channel,实际上是通过反射实例化的
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
   
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void init(Channel channel) {
       setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
       setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

       ChannelPipeline p = channel.pipeline();

       final EventLoopGroup currentChildGroup = childGroup;
       final ChannelHandler currentChildHandler = childHandler;
       final Entry<ChannelOption<?>, Object>[] currentChildOptions =
               childOptions.entrySet().toArray(newOptionArray(0));
       final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

       p.addLast(new ChannelInitializer<Channel>() {
           @Override
           public void initChannel(final Channel ch) {
               final ChannelPipeline pipeline = ch.pipeline();
               ChannelHandler handler = config.handler();
               if (handler != null) {
                   pipeline.addLast(handler);
               }

               ch.eventLoop().execute(new Runnable() {
                   @Override
                   public void run() {
                       pipeline.addLast(new ServerBootstrapAcceptor(
                               ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                   }
               });
           }
       });
   }

This method does the initialization of the Channel, and if we set parameters before starting, they will be passed here as well.

Once the initialization of the Channel is done, the Channel needs to be registered.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

And the doRegister method does the final registration work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

This registers the Channle to the selector multiplexer of the boss thread, completing the initialization and registration of the channel. In fact, the above code shows that each Channel (whether server or client) is globally bound to a unique thread during operation (NioEventLoop), and all of Netty’s I/O operations are performed with this channel corresponding to NioEventLoop. inEventLoop() to determine if it is in the thread corresponding to this channel, if not, it will execute eventLoop.execute(new Runnable() {} When this step is operated, it will determine if the IO thread is started, if not started, the IO thread will be started.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
            }
            if (reject) {
                reject();
            }
        }
    }
    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

Eventually the run method of NioEventLoop will be called.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }
            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

At this point the entire Netty server is up and running.