- 1.pipeline的初始化
之前我们分析过,每构造一个channel的时候会通过newChannelPipeline初始化一个pipeline;
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newChannelPipeline的实现逻辑,this 是当前的channel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//创建tail和head节点
tail = new TailContext(this);
head = new HeadContext(this);
//构造一个双向链表的数据结构,包含head和tail两个节点,链表的元素其实是ChannelHandlerContext
head.next = tail;
tail.prev = head;
}
总结下,pipeline的创建是在创建channel的时候就创建了。
- ChannelHandlerContext 解析
首先看下ChannelHandlerContext的类继承关系
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
- ChannelHandlerContext 解析
包含三层含义:
- extends AttributeMap : 自身可以存储一些属性;
- extends ChannelInboundInvoker:可以触发一些用户事件,包括读事件,注册事件等;
- extends ChannelOutboundInvoker: 可以触发一些用户事件,包括写事件
Channel channel();
EventExecutor executor();
String name();
ChannelHandler handler();
boolean isRemoved();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
ChannelHandlerContext本身包含获取当前的channel,获取当前的NioEventloop,当前属于哪个ChannelHandler等等;
3.HeadContext 和 TailContext
3.1 TailContext继承AbstractChannelHandlerContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
//当前context的名字
this.name = ObjectUtil.checkNotNull(name, "name");
// 当前context所属的pipeline
this.pipeline = pipeline;
// 当前context所属的NioEventLoop
this.executor = executor;
// 标示是inboundHandler还是outboundHandler
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
可以看出TailContext是一个inBound处理器,用于处理读事件,注册事件;
通过cas+自悬操作将当前节点设置为已经添加
final void setAddComplete() {
for (; ; ) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
- 3.2 HeadContext
构造函数比TailContext多了一个unsafe属性,其余的都相同
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//unsafe 属性实现底层数据的读写
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
基本实现了父类的方法,包含读写,注册,异常传播等;
- 4.ChannelHandler的添加与删除
- 4.1 添加channelhandler
在业务代码中我们一般添加handler都是通过这样的方式进行添加
ch.pipeline().addLast(new EchoServerHandler());
接下来我们看下addLast
方法中都做了哪些操作?
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//一个一个添加
for (ChannelHandler h : handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.判断是否重复添加
checkMultiplicity(handler);
//2.构造一个HandlerContext,如果有同名则抛异常
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加HandlerContext
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
//4.如果当前线程是EventLoop,则异步触发HandlerAdded0事件,否则直接触发
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
//判断是不是ChannelHandlerAdapter的实例
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//如果当前handler不是用Sharable注解的并且已经添加了,则直接抛异常
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
//否则,标示已经添加
h.added = true;
}
}
构造一个DefaultChannelHandlerContext对象
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
执行添加操作,就是往链表中插入一个元素
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
添加完成,触发该handler的一个handlerAdded事件,并设置当前handler已经添加
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
- 4.2 删除channelHandler
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
拿到channelHandler节点
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
//从头开始遍历节点,无限for循环,如果遍历到则返回,否则返回null
AbstractChannelHandlerContext ctx = head.next;
for (; ; ) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
移除节点,触发HandlerRemoved事件
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
//当前节点不是头节点和尾节点,因为要保证线程安全,必须保证pipeline的结构
assert ctx != head && ctx != tail;
synchronized (this) {
remove0(ctx);
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
移除节点的操作
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
调用对应Handler的remove方法,最后标示该handler已经remove,设置remove的标示
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
pipeline的操作就讲到这里了。