private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { thrownewChannelException("failed to open a new selector", e); }
if (DISABLE_KEYSET_OPTIMIZATION) { returnnewSelectorTuple(unwrappedSelector); }
if (!(maybeSelectorImplClass instanceof Class) || //ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwablet= (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } returnnewSelectorTuple(unwrappedSelector); }
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
//Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); }
try { //Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); }
protectedvoidrun() { for (;;) { try { //计算策略 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));
//'wakenUp.compareAndSet(false, true)' is always evaluated //before calling 'selector.wakeup()' to reduce the wake-up //overhead. (Selector.wakeup() is an expensive operation.) // //However, there is a race condition in this approach. //The race condition is triggered when 'wakenUp' is set to //true too early. // //'wakenUp' is set to true too early if: //1) Selector is waken up between 'wakenUp.set(false)' and //'selector.select(...)'. (BAD) //2) Selector is waken up between 'selector.select(...)' and //'if (wakenUp.get()) { ... }'. (OK) // //In the first case, 'wakenUp' is set to true and the //following 'selector.select(...)' will wake up immediately. //Until 'wakenUp' is set to false again in the next round, //'wakenUp.compareAndSet(false, true)' will fail, and therefore //any attempt to wake up the Selector will fail, too, causing //the following 'selector.select(...)' call to block //unnecessarily. // //To fix this problem, we wake up the selector again if wakenUp //is true immediately after selector.select(...). //It is inefficient in that it wakes up the selector for both //the first case (BAD - wake-up required) and the second case //(OK - no wake-up required).
if (wakenUp.get()) { selector.wakeup(); } //fall through default: }
cancelledKeys = 0; needsToSelectAgain = false; finalintioRatio=this.ioRatio; //反正就是执行processSelectedKeys以及runAllTasks if (ioRatio == 100) { try { processSelectedKeys(); } finally { //Ensure we always run tasks. runAllTasks(); } } else { finallongioStartTime= System.nanoTime(); try { processSelectedKeys(); } finally { //Ensure we always run tasks. finallongioTime= System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
privatevoidprocessSelectedKeysOptimized() { for (inti=0; i < selectedKeys.size; ++i) { finalSelectionKeyk= selectedKeys.keys[i]; //null out entry in the array to allow to have it GC'ed once the Channel close //See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;
//如果还需要重新Select一次 if (needsToSelectAgain) { //null out entries in the array to allow to have it GC'ed once the Channel close //See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
final AbstractNioChannel.NioUnsafeunsafe= ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { //If the channel implementation throws an exception because there is no event loop, we ignore this //because we are only trying to determine if ch is registered to this event loop and thus has authority //to close ch. return; } //Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop //and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is //still healthy and should not be closed. //See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } //close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { intreadyOps= k.readyOps(); //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise //the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking //See https://github.com/netty/netty/issues/924 intops= k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead //to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //当一切正常时,调用read方法读取数据 unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
privatevoidprocessSelectedKeysPlain(Set<SelectionKey> selectedKeys) { //check if the set is empty and if so just return to not create garbage by //creating a new Iterator every time even if there is nothing to process. //See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; }
Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { finalSelectionKeyk= i.next(); finalObjecta= k.attachment(); i.remove();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
if (!i.hasNext()) { break; }
if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys();
//Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); //keep on processing until we fetched all scheduled tasks.