Netty线程模型源码剖析(文章尾附带全流程图)

Netty服务端设置参数

首先创建bossGroup和workerGroup(Netty客户端和服务端启动源码在上篇文章中), bossGroup只是处理客户端连接请求 ,真正和客户端业务处理会交给workerGroup完成。new NioEventLoopGroup的源码如下(其中有很多调this和super的地方就不一一列出来了,流程图中都包含,只对重要代码做注释分析):

//bossGroup和workerGroup都属于MultithreadEventLoopGroup,相当于线程池
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    //如果线程数为0,则取默认值,有io.netty.eventLoopThreads取这个值,没有的话默认是cpu核数*2
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//进入super到MultithreadEventExecutorGroup,然后会给每个children赋值 children[i] = newChild(executor, args);代码如下
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        //创建NioEventLoop,相当于线程池中的每个线程
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();//调openSelector()方法,打开Selector处理Channel,就是NIO代码
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

创建务器端的启动对象ServerBootstrap(就是一个空对象),然后再对其设置group,设置channel,设置option及在pipeline中加入自己写的Handler。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);//点进去就是给group属性赋值
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;//给childGroup属性赋值,记住这两个属性,下面会用
    return this;
}
//给ServerBootstrap设置channel,最后会跟到下面这段代码
public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
           //通过传过来的类对象NioServerSocketChannel.class,得到构造参数,下面函数会用
           //会在下面一节中分析这个NioServerSocketChannel.class的构造方法
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
   }
//将上述方法得到的constructor,传到本方法内
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
    this.channelFactory = channelFactory;//给channelFactory属性赋值,用来创建对象,后续会用来创建对象
    return self();
}
//设置options,就是一个map,可以初始化服务器连接队列大小,
//服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
public  B option(ChannelOption option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {
            options.put(option, value);
        }
    }
    return self();
}
//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
////对workerGroup的SocketChannel设置自己写的处理器,把自己写的处理器加到管道中
public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    }
    this.childHandler = childHandler;
    return this;
}

Netty服务端绑定端口

Netty服务端绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况;启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕(参考上一篇文章中服务端绑定端口代码),绑定源码如下:

//调用ServerBootstrap的bind函数一直跟最后会到AbstractBootstrap的doBind函数,此函数主要两步
//第一初始化并注册  initAndRegister(),第二 把ServerChannel绑定到网络端口 doBind0,第二步源码就不再分析了
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //就是用constructor.newInstance()即上一节中NioServerSocketChannel.class的构造方法来初始化
            //参考下面源码分析初始化NioServerSocketChannel
            channel = channelFactory.newChannel();
            //初始化channel,获取管道ChannelPipeline,是在初始化NioServerSocketChannel产生的管道对象,
           //向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel
            init(channel);//具体源码参考下面
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
          return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
				//注册channel对象
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
//向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel
void init(Channel channel) throws Exception {
        final Map, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        final Map, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey key = (AttributeKey) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        ChannelPipeline p = channel.pipeline();//获取已经生成的pipeline对象
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry, Object>[] currentChildOptions;
        final Entry, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
       //向管道中添加ChannelInitializer对象,在注册channel时(参考下面的源码)handler的handlerAdded方法再调用initChannel方法,
       //调用完initChannel会调用removeState删除临时产生的ChannelInitializer
        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                  			//执行异步方法,向pipeline中添加ServerBootstrapAcceptor,此时pipeline中有 头、ServerBootstrapAcceptor、尾
                  			//当满足条件时(客户端连接时),会执行ServerBootstrapAcceptor的channelRead方法
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
//初始化NioServerSocketChannel实例
public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));//跟代码最后会调用AbstractNioChannel的构造方法
    }
//创建ServerSocketChannel,其实就是NIO代码封装
private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
//AbstractNioChannel的构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);//调用AbstractChannel的构造方法
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);//// 设置ServerSocketChannel为非阻塞
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }
          throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
//AbstractChannel的构造方法
protected AbstractChannel(Channel parent) {
        this.parent = parent;//给parent赋值就是NioServerSocketChannel
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();//初始话管道Pipeline,给pipeline属性,后续一直用到这个属性
    }

注册channel对象,config().group().register(channel)----->MultithreadEventLoopGroup.register----->SingleThreadEventLoop.register----->AbstractUnsafe.register

//注册channel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
      //会走这里,执行异步操作,添加到队列中
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);//执行异步方法,下面会分析
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        boolean inEventLoop = inEventLoop();//判断启动线程与当前线程是否相同   不同则有两种可能:未启动或者线程不同
        addTask(task);//把异步任务(上一步的异步方法)添加到taskQueue队列中,后续会取
        if (!inEventLoop) {
           //第一次会启动,会调用SingleThreadEventExecutor的startThread()然后再调doStartThread
           //最后再调用SingleThreadEventExecutor.this.run(),即NioEventLoop的run
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                }
                if (reject) {
                    reject();
                }
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
//即NioEventLoop的run 无限循环执行selector监听事件
protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));//会调用selector.select(timeoutMillis)执行selector的监听事件,跟nio一样
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                      //这个方法即监听有无链接事件和读写事件等,会循环selector中所有的事件
                      //进入后会调用NioEventLoop的processSelectedKey方法,
                      //最后当有客户端SelectionKey.OP_READ | SelectionKey.OP_ACCEPT时调用unsafe.read()
                      //OP_ACCEPT事件即NioMessageUnsafe的read方法,OP_READ事件即NioByteUnsafe的read方法
                        processSelectedKeys();
                    } finally {
                      //在执行这个方法,会调用SingleThreadEventExecutor的fetchFromScheduledTaskQueue方法
                      //从taskQueue队列中不断的取异步任务,在上面源码分析过程中,向  taskQueue中存放了一个异步任务
                      //即会执行register0(promise)方法,源码解析参考下面
                      runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
//注册时队列中的异步任务
private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //会调用AbstractNioChannel的doRegister()方法,
               //在javaChannel().register(eventLoop().unwrappedSelector(), 0, this)这一步向selector中注册channel,跟nio一样
                doRegister();
                neverRegistered = false;
                registered = true;
               //跟下去最终调用AbstractChannelHandlerContext的callHandlerAdded方法,
              //即调用pipeline中每个handler的handlerAdded方法,挨个handler执行,目前pipeline中有三个,头、ChannelInitializer、尾
              //在上面代码init(channel);  向  pipeline 中添加过ChannelInitializer,最后会执行他的initChannel方法
              pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                //调用pipeline中每个handler的channelRegistered方法
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                      //调用pipeline中每个handler的channelActive方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

Netty客户端连接事件

Netty的客户端启动时会连接服务端的端口,当发生客户端连接事件时调用NioMessageUnsafe的read方法,源码如下:

public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();//获取channel的pipeline,此时这个是serverSocketChannelPipeline
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    //readBuf缓冲区存放所有通过客户端连接事件过来的SocketChannel,
                   //封装成NioSocketChannel放到readbuf,封装NioSocketChannel即调用其构造方法,跟NioServerSocketChannel一样,
                  //包含设置管道,设置非阻塞,此时NioSocketChannel的pipeline只有头、尾
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //readBuf.get(i)即上面产生的NioSocketChannel对象
                 //执行pipeline中所有handler的channelRead方法,此时pipeline中包含ServerBootstrapAcceptor
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete(); 
            pipeline.fireChannelReadComplete();//执行pipeline中所有handler的readComplete方法
            if (exception != null) {
                closed = closeOnReadError(exception);
                pipeline.fireExceptionCaught(exception);
            }
            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

执行ServerBootstrapAcceptor的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);//向NioSocketChannel的pipeline中添加childHandler,childHandler就是server服务端自己写的那个ChannelInitializer
    setChannelOptions(child, childOptions, logger);
    for (Entry, Object> e: childAttrs) {
        child.attr((AttributeKey) e.getKey()).set(e.getValue());
    }
	//向childGroup即workerGroup中注册 NioSocketChannel,注册流程跟NioServerSocketChannel一样
    try {
      //最终会执行最终会执行自己写的ChannelInitializer的handlerAdded方法,
      //再执行initChannel方法,将自己写的handler添加到pipeline中,然后调用removeState移除自己写的ChannelInitializer
      //此时NioSocketChannel的pipeline有 头、自己写的handler、尾
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

Netty客户端写数据到服务端事件

Netty的客户端启动时会连接服务端的端口,当发生客户端写数据事件时(对服务端来说是读事件)调用NioByteUnsafe的read方法,源码如下:

public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);//调用pipeline中所有handler的channelRead方法,即可以调用自己写的handler的方法了
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();//调用pipeline中所有handler的channelReadComplete方法

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

Netty线程模型源码解析流程图

Netty线程模型全流程图(包含服务端参数设置、服务端端口注册、客户端请求连接服务端、客户端写数据到服务端)如下链接:

https://kdocs.cn/l/chTjGr8JajaI

页面更新:2024-05-26

标签:线程   源码   流程图   队列   初始化   端口   服务端   客户端   模型   对象   事件   方法   文章

1 2 3 4 5

上滑加载更多 ↓
Top