??从上图可以看出,
NioEventLoopGroup
实现了Executor
、ExecutorService
、ScheduledExecutorService
、Iterable
、EventExecutorGroup
、EventLoopGroup
接口,另外三个抽象类AbstractEventExecutorGroup
、MultithreadEventExecutorGroup
、MultithreadEventLoopGroup
提供了接口的部分实现。
-
AbstractEventExecutorGroup
实现了Executor
、ExecutorService
、ScheduledExecutorService
中的部分方法,都是委托给next()
方法返回的EventExecutor
-
MultithreadEventExecutorGroup
主要是创建了EventExecutor[] children
数组,并将多个方法委托给children
-
MultithreadEventLoopGroup
将EventExecutor
类型的chlild
转化成EventLoop
(EventExecutor
的子接口),并将register()
方法委托给EventLoop
-
NioEventLoopGroup
实现newChild()
方法,返回NioEventLoop
(EventLoop的实现类),并将几个方法委托给NioEventLoop
Executor
??Executor
接口比较简单,只有一个execute()
方法,该方法在未来某个时间点执行参数Runnable
实例,可以在当前线程、新建线程或线程池里面执行,取决于具体实现。
public interface Executor {
void execute(Runnable command);
}
ExecutorService
??ExecutorService
继承了Executor
,是建立在Executor
基础上的一个服务,它是Executor
基础上增加了任务管理服务。
??除了执行任务的基础功能外,ExecutorService
还提供submit
类方法,用于提交任务,并返回futrue
实例(可以用于取消执行任务,或者等待执行完成)用于和任务进行通信。
??同时,还支持对Executor
进行shutdown()
操作,这一步可以用于资源的释放。
public interface ExecutorService extends Executor {
//拒绝新任务,但在terminate之前可以执行先前提交的任务
void shutdown();
//拒绝新任务,并删除之前提交的正在等待执行的任务,同时会尝试终止正在执行的任务
List<Runnable> shutdownNow();
boolean isShutdown();
/**如果shutdown后所有任务都已完成,则返回true。
请注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真。**/
boolean isTerminated();
//阻塞,直到shutdown后剩下的所有任务被完成,或者等待超时,或者被中断(interrupted)
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交任务,返回的Future实例可以用于取消执行任务,或者等待执行完成
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
??Executors
类提供ExecutorService
类的多个工厂方法。
ScheduledExecutorService
??ScheduledExecutorService
可以在给定延时后执行任务,或者定期执行任务。
??scheduleAtFixedRate
和scheduleWithFixedDelay
方法创建并执行定期运行的任务,直到被取消。
??scheduleAtFixedRate
表示每隔多少时间,执行一次任务。
??scheduleWithFixedDelay
等到前一个任务结束的时刻,才开始结算间隔时间,如0秒开始执行第一次任务,任务耗时5秒,任务间隔时间3秒,那么第二次任务执行的时间是在第8秒开始。
??通过 Executor.execute(Runnable)
和ExecutorService的submit()
方法提交的任务延迟时间为0。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
EventExecutorGroup
??EventExecutorGroup
负责通过next()
方法提供EventExecutor
。 除此之外,它还负责处理它们的生命周期并允许以全局的方式关闭它们。
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
//当被此EventExecutorGroup管理的所有EventExecutor被shutdown或者被shutdownGracefully时,返回true
boolean isShuttingDown();
//向executor发出关闭信号,此方法一旦被调用,isShuttingDown()就返回true,然后executor准备关闭自己
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回一个future,当所有被当前这个EventExecutorGroup管理的EventExecutor被终止时,会通知这个future
Future<?> terminationFuture();
//返回一个EventExecutor
EventExecutor next();
//被shutdownGracefully方法替代
@Override
@Deprecated
void shutdown();
//被shutdownGracefully方法替代
@Override
@Deprecated
List<Runnable> shutdownNow();
@Override
Iterator<EventExecutor> iterator();
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
EventLoopGroup
??EventLoopGroup
是特殊的EventExecutorGroup
,它可以注册channel
,被注册的channel会在事件循环(event loop
)中被select和处理。
public interface EventLoopGroup extends EventExecutorGroup {
//返回下一个EventLoop
@Override
EventLoop next();
//注册Channel,返回的ChannelFuture会在注册完成时得到通知
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
AbstractEventExecutorGroup
??AbstractEventExecutorGroup
只是将部分方法的实现的委托给next()
方法返回的EventExecutor
,但是没具体实现next()
方法。
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public abstract void shutdown();
/**
* @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
*/
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return next().invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return next().invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
next().execute(command);
}
}
MultithreadEventExecutorGroup
??MultithreadEventExecutorGroup
主要是创建了EventExecutor[] children
数组,并将多个方法委托给children
。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
@Override
public EventExecutor next() {
return chooser.next();
}
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
public final int executorCount() {
return children.length;
}
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
@Override
@Deprecated
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
@Override
public boolean isShuttingDown() {
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
}
MultithreadEventLoopGroup
??MultithreadEventLoopGroup
将EventExecutor
类型的chlild
转化成EventLoop
(EventExecutor
的子接口),并将register()
方法委托给EventLoop
。
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
NioEventLoopGroup
??NioEventLoopGroup
实现newChild()
方法,返回NioEventLoop
(EventLoop的实现类),并将几个方法委托给NioEventLoop
。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}