首先创建bossGroup和workerGroup(Netty客户端和服务端启动源码在上篇文章中), bossGroup只是处理客户端连接请求 ,真正和客户端业务处理会交给workerGroup完成。new NioEventLoopGroup的源码如下(其中有很多调this和super的地方就不一一列出来了,流程图中都包含,只对重要代码做注释分析):
//bossGroup和workerGroup都属于MultithreadEventLoopGroup,相当于线程池
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果线程数为0,则取默认值,有io.netty.eventLoopThreads取这个值,没有的话默认是cpu核数*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//进入super到MultithreadEventExecutorGroup,然后会给每个children赋值 children[i] = newChild(executor, args);代码如下
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//创建NioEventLoop,相当于线程池中的每个线程
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();//调openSelector()方法,打开Selector处理Channel,就是NIO代码
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
创建务器端的启动对象ServerBootstrap(就是一个空对象),然后再对其设置group,设置channel,设置option及在pipeline中加入自己写的Handler。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);//点进去就是给group属性赋值
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;//给childGroup属性赋值,记住这两个属性,下面会用
return this;
}
//给ServerBootstrap设置channel,最后会跟到下面这段代码
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
//通过传过来的类对象NioServerSocketChannel.class,得到构造参数,下面函数会用
//会在下面一节中分析这个NioServerSocketChannel.class的构造方法
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
//将上述方法得到的constructor,传到本方法内
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;//给channelFactory属性赋值,用来创建对象,后续会用来创建对象
return self();
}
//设置options,就是一个map,可以初始化服务器连接队列大小,
//服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
//多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
public B option(ChannelOption option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return self();
}
//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
////对workerGroup的SocketChannel设置自己写的处理器,把自己写的处理器加到管道中
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
Netty服务端绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况;启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕(参考上一篇文章中服务端绑定端口代码),绑定源码如下:
//调用ServerBootstrap的bind函数一直跟最后会到AbstractBootstrap的doBind函数,此函数主要两步
//第一初始化并注册 initAndRegister(),第二 把ServerChannel绑定到网络端口 doBind0,第二步源码就不再分析了
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//就是用constructor.newInstance()即上一节中NioServerSocketChannel.class的构造方法来初始化
//参考下面源码分析初始化NioServerSocketChannel
channel = channelFactory.newChannel();
//初始化channel,获取管道ChannelPipeline,是在初始化NioServerSocketChannel产生的管道对象,
//向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel
init(channel);//具体源码参考下面
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//注册channel对象
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
//向管道中添加ChannelInitializer对象,当channel注册时会调用andlerAdded从而调用initChannel
void init(Channel channel) throws Exception {
final Map, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey
//初始化NioServerSocketChannel实例
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));//跟代码最后会调用AbstractNioChannel的构造方法
}
//创建ServerSocketChannel,其实就是NIO代码封装
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//AbstractNioChannel的构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);//调用AbstractChannel的构造方法
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);//// 设置ServerSocketChannel为非阻塞
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//AbstractChannel的构造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;//给parent赋值就是NioServerSocketChannel
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();//初始话管道Pipeline,给pipeline属性,后续一直用到这个属性
}
注册channel对象,config().group().register(channel)----->MultithreadEventLoopGroup.register----->SingleThreadEventLoop.register----->AbstractUnsafe.register
//注册channel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
//会走这里,执行异步操作,添加到队列中
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);//执行异步方法,下面会分析
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();//判断启动线程与当前线程是否相同 不同则有两种可能:未启动或者线程不同
addTask(task);//把异步任务(上一步的异步方法)添加到taskQueue队列中,后续会取
if (!inEventLoop) {
//第一次会启动,会调用SingleThreadEventExecutor的startThread()然后再调doStartThread
//最后再调用SingleThreadEventExecutor.this.run(),即NioEventLoop的run
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
//即NioEventLoop的run 无限循环执行selector监听事件
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));//会调用selector.select(timeoutMillis)执行selector的监听事件,跟nio一样
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//这个方法即监听有无链接事件和读写事件等,会循环selector中所有的事件
//进入后会调用NioEventLoop的processSelectedKey方法,
//最后当有客户端SelectionKey.OP_READ | SelectionKey.OP_ACCEPT时调用unsafe.read()
//OP_ACCEPT事件即NioMessageUnsafe的read方法,OP_READ事件即NioByteUnsafe的read方法
processSelectedKeys();
} finally {
//在执行这个方法,会调用SingleThreadEventExecutor的fetchFromScheduledTaskQueue方法
//从taskQueue队列中不断的取异步任务,在上面源码分析过程中,向 taskQueue中存放了一个异步任务
//即会执行register0(promise)方法,源码解析参考下面
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
//注册时队列中的异步任务
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//会调用AbstractNioChannel的doRegister()方法,
//在javaChannel().register(eventLoop().unwrappedSelector(), 0, this)这一步向selector中注册channel,跟nio一样
doRegister();
neverRegistered = false;
registered = true;
//跟下去最终调用AbstractChannelHandlerContext的callHandlerAdded方法,
//即调用pipeline中每个handler的handlerAdded方法,挨个handler执行,目前pipeline中有三个,头、ChannelInitializer、尾
//在上面代码init(channel); 向 pipeline 中添加过ChannelInitializer,最后会执行他的initChannel方法
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//调用pipeline中每个handler的channelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//调用pipeline中每个handler的channelActive方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
Netty的客户端启动时会连接服务端的端口,当发生客户端连接事件时调用NioMessageUnsafe的read方法,源码如下:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();//获取channel的pipeline,此时这个是serverSocketChannelPipeline
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//readBuf缓冲区存放所有通过客户端连接事件过来的SocketChannel,
//封装成NioSocketChannel放到readbuf,封装NioSocketChannel即调用其构造方法,跟NioServerSocketChannel一样,
//包含设置管道,设置非阻塞,此时NioSocketChannel的pipeline只有头、尾
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//readBuf.get(i)即上面产生的NioSocketChannel对象
//执行pipeline中所有handler的channelRead方法,此时pipeline中包含ServerBootstrapAcceptor
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//执行pipeline中所有handler的readComplete方法
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
执行ServerBootstrapAcceptor的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);//向NioSocketChannel的pipeline中添加childHandler,childHandler就是server服务端自己写的那个ChannelInitializer
setChannelOptions(child, childOptions, logger);
for (Entry, Object> e: childAttrs) {
child.attr((AttributeKey) e.getKey()).set(e.getValue());
}
//向childGroup即workerGroup中注册 NioSocketChannel,注册流程跟NioServerSocketChannel一样
try {
//最终会执行最终会执行自己写的ChannelInitializer的handlerAdded方法,
//再执行initChannel方法,将自己写的handler添加到pipeline中,然后调用removeState移除自己写的ChannelInitializer
//此时NioSocketChannel的pipeline有 头、自己写的handler、尾
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
Netty的客户端启动时会连接服务端的端口,当发生客户端写数据事件时(对服务端来说是读事件)调用NioByteUnsafe的read方法,源码如下:
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);//调用pipeline中所有handler的channelRead方法,即可以调用自己写的handler的方法了
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//调用pipeline中所有handler的channelReadComplete方法
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
Netty线程模型全流程图(包含服务端参数设置、服务端端口注册、客户端请求连接服务端、客户端写数据到服务端)如下链接:
https://kdocs.cn/l/chTjGr8JajaI
页面更新:2024-05-26
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号