1. Netty源码研究笔记(4)——EventLoop系列

EventLoop,即事件驱动,它是Netty的I/O模型的抽象,负责处理I/O事件、任务。

不同的EventLoop代表着不同的I/O模型,最重要、最主要的是NioEventLoop,表示多路复用的I/O模型,对应jdk的NIO。

NioEventLoop是单线程的,它通过将I/O事件的等待时间用于执行其他任务,从而提高了线程的利用率,提高了单线程的吞吐量。

然而用户并不能直接使用EventLoop,应该使用EventLoopGroup,它代表了一组EventLoop。

1.1. 继承关系

EventLoopGroup、EventLoop继承自EventExecutorGroup、EventExecutor。

EventExecutor、EventExecutorGroup中定义了任务执行的相关方法,它们继承于jdk的Executor系列,对其进行了增强,因此Netty同样扩展了jdk的Future。

EventLoop、EventLoopGroup新增了注册Channel的功能。

1.1.1. EventExecutorGroup

注意EventExecutorGroup继承自ScheduledEventExecutorService,因此它有着提交延时任务的功能。

并且它继承了Iterable接口,因此他可以轮询自己管理的EventExecutor。

EventExecutorGroup管理一组EventExecutor,它在执行提交的task的时候,是从自己管理的EventExecutor中选出一个,转交给它来执行。

EventExecutorGroup提供的功能有:

  • next:选择一个EventExecutor将其返回。

  • iterator:返回遍历自己管理的EventExecutor集合的迭代器。

  • shutDownGracefully:将自己管理的所有EventExecutor给优雅的关闭,可以指定timeout,也可以不指定timeout。

  • isShuttingDown:自己管理的所有EventExecutor是否正在shutDownGracefully,或已经shutdown了。

  • terminationFuture:返回自己的terminationFuture,它指示EventExecutorGroup的关闭动作的执行情况。

  • 重写自ExecutorService、ScheduledExecutorService的方法:submit、schedule、scheduleAtFixedRate、scheduleWithFixedDelay。然而重写的时候并没有取改变方法的入参、返回参数,这些方法签名信息,为什么要重写呢?那是因为对于EventExeucotGroup来说,它的这些任务执行方法和ExecutorService、ScheduledExecutorService的语义有差别,它额外添加了这个任务的执行是由EventExecutorGroup去执行,最终是交给其管理的EventExecutor来执行这个语义,因此和jdk的语义有所区分。

1.1.2. EventExecutor

EventExecutor的实际实现类有:DefaultEventExecutor、GlobalEventExecutor、NoStickyOrderedEventExecutor、UnorderedThreadPoolEventExecutor、ImmediateEventExecutor。

其中后两者不是OrderedEventExecutor,前三者都是OrderedEventExecutor。

所谓的OrderedEventExecutor是指,同一线程提交的任务,保证它们的执行顺序为提交顺序。

EventExecutor在EventExecutorGroup的基础上还增加了:

  1. 创建Future、Promise的功能
  2. 判断给定线程是否在EventLoop中的功能
  3. 获取自己所在的EventExecutorGroup的功能
  4. 重写了EventExecutorGroup的next方法,对于前者,它返回其管理的一个EventExecutor,对于EventExecutor来说该方法返回自身。

1.1.3. EventLoopGroup

EventLoopGroup管理一组EventLoop,它继承自EventExecutorGroup,它在EventExecutorGroup的基础上提供了注册Netty Channel的功能。

1.1.4. EventLoop

除了EmbeddedEventLoop不是OrderedEventExecutor,其他的都是OrderedEventExecutor(都继承自SingleThreadEventLoop)

EventLoop接口内并没有新增新的功能,它只是重写parent方法,将返回类型改为EventLoopGroup。

1.2. AbstractEventExecutorGroup

由于EventExecutorGroup在执行提交的任务的时候是转交给自己管理的EventExecutor,所以AbstractEventExecutorGroup除此之外没有别的过程细节了,并且AbstractEventExecutorGroup的实现上也只做了这一件事,对于获取管理的EventExecutor、优雅关闭,这些非执行任务类型的方法,它没有实现。

1.3. MultiThreadEventExecutorGroup

MultiThreadEventExecutorGroup继承于AbstractEventExecutorGroup。

AbstractEventExecutorGroup负责的是任务执行的相关功能。

MultiThreadEventExecutorGroup负责的剩下的其他的事项:

  • event executor的管理
  • shutdown、terminate相关

注意,MultiThreadEventExecutorGroup不负责创建它所管理的EventExecutor,这个child的创建工作(newChild方法)留给了自己的子类取实现。

然后它还把从自己管理的EventExecutor挑出来一个交给请求方这个功能又剥离开来,交给EventExecutorChooser来实现。

MultiThreadEentExecutorGroup,它在创建自己管理的EventExecutor的时候,为这每个待创建的EventExecuto都提供了一个单线程的Executor(ThreadPerTaskExecutor),因此被称为是multi thread event executor group。

MultiThreadEventExecutorGroup是一个抽象类,在构造它的时候需要提供线程的数量,线程的数量即为它管理的event executor数量。

然后其构造函数的多余的入参,其实是转交给newChild了,用来创建event executor。

1.4. DefaultEventExecutorGroup

这个没什么好说,它实现了MultiThreadEventExecutorGroup的newChild为创建DefaultEventExecutor,并且对于缺省构造参数的情况下,设置默认的任务拒绝策略为拒绝(这些拒绝策略在RejectedExecutionHandlers中,目前一共就两种,还有一种是backoff),设置默认的pending task数量为最大的Integer。

1.5. AbstractEventExecutor

AbstractEventExecutor几乎是所有EventExecutor的共同祖先,除了UnOrderedThreadPoolEventExecutor。

UnOrderedThreadPoolEventExecutor,是基于jdk的ScheduledThreadPoolExecutor来实现的EventExecutor的功能,其他的EventExecutor都基于是netty自己的实现。

AbstractEventExecutor实现了EventExecutor接口中新加的功能,见上面EventExecutor:

  1. 创建Future、Promise的功能

  2. 判断给定线程是否在EventLoop中的功能

  3. 获取自己所在的EventExecutorGroup的功能

AbstractEventExeutor继承于jdk的AbstractExecutorService,因此它还有些配合AbstractExecutorService的方法。

比如:将newTaskFor方法需要返回的RunnableFuture(这个接口继承于Runnable、Future,future的结果为run结束后得到的结果)返回为netty自己实现的PromiseTask。

然后schedule相关的方法,AbstractEventExecutor暂不支持,直接抛异常。

1.6. AbstractScheduledEventExecutor

AbstractScheduledEventExecutor继承于EventExecutor,它实现了跟延时任务的相关方法,实现上使用ScheduledFutureTask来实现,注意,在时间方面上使用的是nano time,这个跟current time millis不一样,前者是跟当前JVM启动的时间起开始算,而后者是从GMT1970年1月1日0时到现在当前的毫秒数。

AbstractScheduledEventExecutor内部使用了netty自己的特殊的优先级队列来装这些ScheduledFutureTask。

不管延时任务、还是周期任务,它们只是构造ScheduledFutureTask时的入参不同而已。再实现延时/周期执行的功能时,AbstractScheduledEventExecutor只是提供了一个骨架代码,关键的功能实现还是要看AbstractScheduledEventExecutor的子类对其定义的
hook函数的实现,ScheduledFutureTask在执行时候的内部实现,以及两者间的配合。

1.6.1. schedule方法

其他所有public的schedule方法都是调用了这个私有的schedule方法,它们负责创建ScheduledFutureTask。

其中execute、lazyExecute、beforeScheduledTaskSubmitted、afterScheduledTaskSubmitted这几个方法都待子类来实现。

方法解释:

如果schedule方法在EventLoop内部被调用,那么简单的将任务放到周期、延时任务队列中,因为被调用时EventLoop肯定不是处于IO的等待阻塞状态,所以这样做安全。(因为EventLoop在IO阻塞前会看deadline最近的延时任务的deadline,从而计算出阻塞的超时时间。)

如果是在EventLoop之外被调用,这里就NioEventLoop进行说明:

如果EventLoop正在select阻塞着,并且下一次被唤醒在这个task的deadline之后,那么就将这个任务添加到SingleThreadEventExecutor的普通任务队列中,并唤醒EventLoop。

反之如果EventLoop当前没有阻塞,或阻塞但下一次被唤醒时间在这个task的deadline之前,那么就将这个任务添加到SingleThreadEventExecutor的普通任务队列中,添加完后再次判断一下是否到deadline了,如果到了就尝试唤醒EventLoop(而不管EventLoop当前是否阻塞)。

1.7. SingleThreadEventExecutor

SingleThreadEventExecutor是EventLoop体系的一个核心类(另外一个核心类是NioEventLoop),它负责实现整个的EventExecutor生命周期的管理:

比如:execute、shutdownGracefully等方法。

SingleThreadEventExecutor需要通过一个Executor来创建,然后启动后它会独占这个Executor中的一个线程,它只负责管理生命周期,至于EventLoop具体怎么loop,它不负责,这个它抽象出来个run方法,留给自己子类去实现,比如对于DefaultEventExecutor,它的实现是个死循环,然后不断从队列中获取任务执行,并更新上一次的执行时间,对于NioEventLoop,它的loop实现中,不仅要从队列中取出任务执行,还需要处理IO事件。

SingleThreadEventExecutor的state依次为:not startedstartedshuttingdownshutdownterminated

1.7.1. execute方法

SingleThreadEventExecutor在执行任务时是通过将task添加到自己内部的普通任务队列中(区别于AbstractScheduledEventExecutor中的周期任务队列:PriorityQueue)。

SingleThreadEventExecutor被创建时还没有开启事件循环线程,当提交第一个任务后才开启事件循环线程。

如果提交不是LazyRunnable,那么将任务入队之后还要唤醒EventLoop线程。

如果是从外部提交任务,提交完成后发现状态已经是ST_SHUTDOWN及以后了,那么就尝试从任务队列中移除当前task,如果移除成功就reject,如果移除失败,那么说明这个任务得到了执行。

addTasksWakesUp,这个字段是子类在继承SingleThreadEventExecutor时,要告知给它的一个指示性字段,表示在子类的实现中,addTask方法会不会唤醒SingleThreadEventExecutor所在的线程。如果会唤醒的话,那么SingleThreadEventExecutor就没必要多余再手动的调用wakeup函数了。

SingleThreadEventExecutor的wakeup函数的实现是向任务队列添加一个WAKE_UP任务,对于需要处理IO的子类NioEventLoop来说,它还会调用selector的wakeup函数。

startThread:

startThead方法在execute时候被调用,用于启动一次,它采用对SingleThreadEventExecutor的state字段的CAS操作,来保证只会被真正startThread一次。

doStartThread:

真正表示SingleThreadEventExecutor运行时声明周期的其实在doStartThread方法中。

doStartThread就是向SingleThreadEventExecutor持有的executor(默认为ThreadPerTaskExecutor)添加一个任务,这个任务本身是一个事件循环(死循环),它只有在shutdown时或者执行任务时遇到无法处理的异常时被打断。

这个方法看起来很长,实际上大部分篇幅都在999行起的finally语句块中跟shutdown相关的代码,代码执行到它时分两种情况:

  • 被shutdown了(这种情况success被设置为true)

  • 子类实现的run方法在运行时出现了不能处理的异常。(这种情况success为false)

事件循环(run方法)结束后,首先设置状态为shutting down(循环CAS直到成功为止),因为可能是因为事件循环抛异常进这儿来的。

然后检查子类的事件循环的实现中,结束时,是否调用了confirmShutdown方法,如果没有调用,那么日志:子类的实现有bug。

然后执行ST_SHUTTING_DOWN到ST_SHUTDOWN状态切换前的过渡工作(在循环供不断的执行cofirmShutdown方法,判断是否结束这个过渡期),这个过渡工作跟shutdownGracefully方法的quietPeriod、timeout入参有关,这个过渡工作就是说:在这个状态切换的前,event executor会等待quietPeriod这么长的时间,如果在这段时间外界又提交了新的任务,那么执行这些任务,并且又重新等待quietPeriod这么长的时间,直到连续的静默期超过了quietPeriod,或者在ST_SHUTTING_DOWN状态的总停留时间达到了timeout,那么这个过渡期才结束。

过渡期结束后,就将状态设置为ST_SHUTDOWN。

然后再confirmShutdown一次,因为设置状态为ST_SHUTDOWN的时候是循环CAS操作,在状态设置成功前,仍可能有任务添加进来了,因此我们需要执行这部分任务。

然后调用cleanup方法,它是一个protect方法,给子类实现,对于NioEventLoop来说,它关闭selector。

然后移除本线程所有的FastThreadLocal、丢弃掉立即任务队列中剩余的所有任务、terminationFuture设置为success、状态设置为terninated。

confirmShutdown方法:

confirmShutdown在自定义的EventLoop结束的前(shutdown)需要被调用,返回true才能退出eventloop。

confirmShutdwon返回的结果表示,是否可以进入ST_SHUTDOWN了,confirmShutdown在状态为ST_SHUTTING_DOWN的时候起作用,是这两个状态切换前的check。

gracefulShutdownQuietPeriod表示ST_SHUTTING_DOWN的静默时间阈值,EveentExecutor的ST_SHUTTING_DOWN状态的连续静默时间(这段时间内外部没有提交任务)超过了gracefulShutdownQuietPeriod,或者EventExecutor的ST_SHUTTING_DOWN状态的总时间超过了gracefullyTimeout,EventExecutor才能确保自己能从shuttingdown切换为shutdown。

当EventExecutor状态为shuttingdown时,在confirmShutdown时如果发现没有任务或shutdown hook可以执行时,那么就说明自己目前是quiet的,这时每隔100ms检查一次788行跟1022行结合起来看。

779行对于gracefullyShutdownTimeout的判断其实算是一种被动的判断,就是说如果shuttingdown了,当外部线程提交task的速度超过了EventExecutor 在循环confirmShutown时执行任务的速度,那么gracefullyTimeout不会被检查到,没起到作用,但只要提交task的速度有一点跟不上,也就是说出现了一丝Quiet的机会,shutdown超时就会通过779行对gracefullShutdownTimeout的检查时被捕捉到。

注:不知道为什么confirmShutdown方法里面需要给queue添加WAKE_UP任务,感觉没必要啊,因为这个方法都是在事件循环所在线程被调用,并且这个方法被调用的时候,该线程自身肯定没有被block在这个queue上啊(这个queue在SingleThreadEventExecutor中默认是LinkedBlockingQueue,在NioEventLoop中是JCT的非阻塞的Mpsc queue)。

1.7.2. shutdownGracefully方法

由于shutdown方法已经被废弃,所以我们不看,只看shutdownGracefully。

无入参的shutdownGracefully在AbstractEventExecutor中实现了,它使用quietPeriod为2s,timeout为15s,来调用有入参的shutdownGracefully方法。

shutdownGracefully内部使用循环+CAS操作来保证只有一个线程会将SingleThreadEventExecutor的状态设置为shuttingdown。

出634行的for循环只有两种情况:

  • 成功的改变了SingleThreadEventExecutor的状态为shuttingdown,这时,如果之前event executor还没有启动,那么还会将其启动。需要wakeup。

  • 在635行状态还不是shuttingdown,但是在640行状态就成shuttingdown了,这时CAS操作的oldstate和newstate相同,CAS了个寂寞,不过在这种情况接下来就不需要wakeup了,因为说明已经有另外的线程在做shutdown的了。这种情况下后面的代码ensureThreadStarted时也不会再doStartThread,只是修改了gracefulShutdownQuietPeriod、gracefulShutdownTime这两个字段。这两个字段只在confirmShutdown时使用到。

1.8. MultiThreadEventLoopGroup

MultiThreadEventLoopGroup中定义了EventLoop的默认线程数量,它是CPU核心数的两倍。

注意,这个线程的数量的设置是有讲究的:

  • 计算密集性:CPU核心数 + 1。
  • IO密集性:CPU 核心数 *(1 + 平均等待时间/平均工作时间),一般设置为CPU核心数两倍。
  • 混合性: CPU核心数 / 计算密集任务占总任务的比例

如果我们构造EventLoopGroup的时候没有提供线程数量,那么实际使用这个默认数量。

它在注册Channel的时候,是从自己管理的EventLoop中选一个出来,用它来注册Channel。

1.9. NioEventLoopGroup

NioEventLoopGroup做了剩下的工作,它内部是一大堆各种参数的构造函数,这些参数配合newChild方法,从而能创建出自己管理的NioEventLoop。

1.10. SingleThreadEventLoop

SingleThreadEventLoop完成了注册Channel的功能。它内部还定义了一个tailTasks这样一个queue,它的作用是,我们可能想再每一次eventloop结束时,这个时间点来执行一些任务,比如说统计eventloop次数之类的,然后这些任务会放在这个queue中,添加这类任务的方法是:executeAfterEventLoopIteration。

除此之外,SingleThreadEventLoop还定义了SingleThreadEventExecutor的任务队列的最大容量的默认值:io.netty.eventLoop.maxPendingTasks这个启动参数设置它,默认是Integer.MAX_VALUE,最小为16。

1.11. NioEventLoop

NioEventLoop是EventLoop体系的另外一个核心类,上一个是SingleThreadEventExecutor。

它实现了事件循环内部的细节。通过将普通的任务的处理和IO的等待时间重叠,从而提高了单线程的利用率,提高了吞吐量。

由于SingleThreadEventExecutor负责的是任务的执行,所以NioEventLoop补充的是对IO处理。

NioEventLoop是基于JDK的NIO,因此它持有的字段都是围绕着IO的处理展开,都是围绕jdk的nio展开。

下面讲NioEventLoop的一些特性:

  • NioEventLoop使用的任务队列是JCT的MPSC(多生产者单消费者非阻塞队列)。

  • NioEventLoop还对jdk的selector进行了优化:

    这个优化默认是开启的,可以将启动参数io.netty.noKeySetOptimization设置为false来禁用。

    jdk的SelectorImpl内部在存放selection key的时候,用的是HashSet——即keys、selectedKeys、publicKeys、publicSelectedKeys这四个字段,前两个声明为Set<SelectionKeys>类型,在构造器中实际创建为HashSet,后两个是前两个的unmodifiableSet、ungrowableSet,前两者不暴露给用户,暴露的是后两者,这样防止用户对其错误修改了——keys()、selectedKeys()返回的就是后两者。然后jdk的selector实现中,在register、select操作的时候,修改的是前两者,这样连带着后两者也改变了(原集合和unmodifiable集合之间共享数据,unmodifiable只是禁止了修改的操作,并不是copy数据后独立出来),因此用户可以及时的拿到更新。如果不用netty的话,用户需用迭代器来遍历selected key,并且在处理完之后要使用迭代器的remove方法来移除处理完的key。

    netty的优化是,将selectedKeys、publicSelectedKeys这两个字段通过反射,替换成了数组的实现(SelectedSelectionKeySet),并且它们指向同一个实例,因为add操作很频繁,而HashSet在面对碰撞的时候效率会降低。然后把这个SelectedSelectionKeySet和这个jdk的selector打包起来创建为SelectedSelectionKeySetSelector,netty用的就是这个打包后的selector,之前的jdk的selector就成了unwrapped selector了。

    这个SelectedSelectionKeySetSelector对于selector的实现都是委托给unwrapped selector,只是对于select、selectNow方法,它会将SelectedSelectionKeySet先reset一遍,为啥要reset一遍呢,因为jdk的selector的实现中,在进行select、selectNow操作的时候,因为用的是Set来存key所以只是简单的add(Set会自动去重),然而,SelectedSelectionKeySet在add的时候只是将数组的下标往后移动而已,不自带去重功能,所以如果不reset的话,那么同一个key就会存多次。并且这些select操作的时候,并不会自动的将上一次select到的,而这次没有select到的key从selectedKeys字段中移除,因此对于直接使用NIO的用户来说,遍历处理selected key时,处理完后要调用迭代器的remove方法,所以reset也有这样的作用,相当于一次将所有都remove了,然后再次select时,留下来的都是这次select到的key。

我们不能直接使用NioEventLoop,我们应该使用NioEventLoopGroup,NioEventLoop构造参数挺多的,它被NioEventLoopGroup管理并构造。

1.11.1. run方法(事件循环)

run方法在SingleThreadEventExecutor的doStartThread中被调用到,它是一个循环体,是eventloop的本体,跳出它的循环之后就是shutdown的逻辑了(doStartThread中)。

思路是不存在立即任务的时候使用selector阻塞select,存在立即任务的时候时候则使用selector的非阻塞方法selectNow获取selectionKey,然后将selectionKey的处理和task的执行放在一块,利用ioRatio(io比例,也就是sectionKey处理时间比例)来划分两者的执行时间。将task的执行时间和selectionKey的等待时间重叠在一起。

每次循环首先计算strategy:当SingleThreadEventExecutor的taskQueue或SingleThreadEventLoop的tailTasks队列不为空时(这两个队列中的存的是立即任务),就使用selector的selectNow(非阻塞),它返回的是当前SelectionKey的数量;当没有立即任务的时候,返回SelectStrategy.SELECT。

也就是说,当有立即任务时,本次循环就不阻塞等待IO了,直接进入后面的SelectionKey和任务的处理逻辑。而当没有立即任务时,本次循环就可以阻塞:获取最近将要执行的延时任务的deadline记录到nextWakeupNanos字段中(该字段可以帮助我们判断EventLoop是否阻塞着,以及下次什么时候会唤醒,它在AbstractScheduledEventExecutor schedule一个延时任务时用到),然后用这个deadline和当前时间的间隔作为本次阻塞时间(如果阻塞时间小于5ms,就不阻塞了),阻塞之前再次检查一下是否有立即任务。阻塞结束后(被wakeup或到期),会将nextWakeupNanos设为-1,表示AWARE。

上面跟selector有关的操作是在一个try catch块中进行的,如果出现了异常,那么会rebuildSelector,并且将selectCnt(表示当前连续空select的次数,由于JDK 在linux 平台上有臭名昭著的epoll bug)置为0。

当上面决定不阻塞,或者阻塞结束,就进入下面的处理逻辑:SelectionKey和周期任务、立即任务。

SelectionKey的处理和任务的执行的时间比例划分由ioRatio来决定:

  • 如果ioRatio为100时,表示所有的执行时间优先分配给io处理,调用processSelectedKeys,selectionKey处理完后再执行当前所能执行的所有task。

  • ioRatio不为100时,判断当前有没有准备好的sectionKey:如果有的话就先处理所有准备好的selectionKey,然后根据它们的处理时间以及ioRatio,计算出task的执行时间(timeout)然后执行task;如果当前没准备好的selectionKey,那么就最多执行64个task(尽可能少地执行任务)。

task和SelectionKey process后,检查是否需要rebuilde selector:如果处理阶段没有执行任务且没有处理SelectionKey,那么就有发生空轮询bug的可能性:这时如果是因为线程中断而提前唤醒,那么就清空selectCnt(当前连续空轮询次数);如果不是线程中断,且当前selectCnt达到了SELECTOR_AUTO_REBUILD_THRESHOLD(通过io.netty.selectorAutoRebuildThreshold启动参数配置,默认512,如果配的小于3,那么就设置为0),那么就判定此时发生了空轮询,此时要rebuild selector,然后重置selectCnt。

rebuild selector的时候,不仅是创建新的selector,并且还要将注册在旧selector上的channel重新注册到新的selector上,并且将得到的selection key替换到对应的Netty Channel中(Netty channel是作为attachment,从而关联上selection key的)。

每轮循环分上面几个阶段,它们在一个大的try语句块中,这几个阶段中出现error后,run方法就结束并向外抛这个error,出现异常了就调用异常处理逻辑(睡眠1s),当每轮循环的这两个阶段结束后,会检查EventLoop是否已经调用了shutdown:如果是就获取到selector管理的所有Channel(netty channel,它作为SelectionKey的attachment),使用它们的unsafe对象来close它们,然后confirmShutdown之后退出run方法;如果当前没有shutdown,那么就继续下一轮循环。

1.11.1.1. processSelectedKeys

selectedKeys即为上面的对selector优化的SelectedSelectionKeySet。如果它为null,表示禁用了selector优化,反之没有。

因此进入了两种不同的process逻辑:

  • 一种为optimized
  • 一种为plain

processSelectedKeysOptimized:

之前select操作拿到的key都放在selectedKeys这个字段中,内部为数组存储,因此只需要依次按下标遍历即可。

每遍历一个key,就将其在数组中的槽位置空,这样对哪些已经close掉的channel能够帮助它们GC。

每处理一个key的时候,都需要检查是否要重新select(selectNow):当netty channel 向其注册的eventloop进行deregister的时候,那么会将selection key给取消,并且增加eventloop的cancel key计数——cancelledKeys,注意这个计数只是在processSelectedKeys期间有效,eventloop每迭代一次都会将置为0。当processSelectedKeys期间的cancelledKeys计数超过了阈值(硬编码为256),那么就需要重新selectNow。
当需要重新select的时候,会将selectedKeys给清空(所有槽位置为null,并且size变0,因为之前处理的已经给置null了,所以只需要从i+1下标位置开始置null)。

processSelectedKeysPlain:

processSelectedKeysPlain跟processSelectedKeysOptimized要做的事情差不多,只是在没有优化selector的场景下,需要跟平时自己手动使用NIO一样,采用迭代器来遍历selected keys。

processSelectedKey:

processSelectedKey方法有两个重载方法,我们忽略第二个入参为NioTask类型的processSelectedKey方法,只看第二入参类型为AbstractNioChannel类型的方法。

首先对选的key的有效性进行判断,如果key无效了,要判断对应的channel是否还注册在本eventloop上,如果是,那么才有权限对其进行close,否则不能close。

AbstractChannel在deregister的时候,将deregister动作交给pipeline,pipeline交给tail ctx,最终在head ctx调用AbstractChannell的unsafe的deregister。

AbstractUnsafe的deregister过程并没有将Channel的eventloop字段置为空,只是把unsafe的registered字段置为false。那么在692行会不会出现竞态?答案是不会,因为unsafe在执行deregister的时候把这个动作作为一个task交给eventloop执行(Unsafe的操作基本都放在IO线程中做),而eventloop是单线程,自然不会出现竞态:因为deregister后,在下轮的select中就会把过期的SelectionKey给清除了,也就是说本轮处理SelectionKey,取消动作还没执行(IO先于任务),而SelectionKey的取消动作执行后,下一轮开头Select动作又把它清扫了,所以对于deregister而言不会走到678的if语句块中。但是如果对于disconnect、close这些操作,如果说同一个eventloop中的channel在处理IO的时候去disconnect、close其他channel,那么轮到该channel的IO处理时,就可能进入678的if块中。

当SelectionKey的OP_CONNECT就绪后,就不再关注OP_CONNECT,并调用unsafe的finishConnect方法。

当SelectionKey的OP_WRITE就绪后,就调用unsafe的forceFlush方法

当SelectionKey没有就绪的OP或SelectionKey的OP_ACCEPT或OP_READ就绪后,就调用unsafe的read方法

注:上面三句话特别重要。

内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/stepfortune/p/16300845.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!