2019-05-19 pipeline 初始化、新增、删除操作

  • 1.pipeline的初始化

之前我们分析过,每构造一个channel的时候会通过newChannelPipeline初始化一个pipeline;

 protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

newChannelPipeline的实现逻辑,this 是当前的channel

protected DefaultChannelPipeline newChannelPipeline() {
       return new DefaultChannelPipeline(this);
   }
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise = new VoidChannelPromise(channel, true);

        //创建tail和head节点
        tail = new TailContext(this);
        head = new HeadContext(this);

        //构造一个双向链表的数据结构,包含head和tail两个节点,链表的元素其实是ChannelHandlerContext
        head.next = tail;
        tail.prev = head;
    }

总结下,pipeline的创建是在创建channel的时候就创建了。

    1. ChannelHandlerContext 解析
      首先看下ChannelHandlerContext的类继承关系
      public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker

包含三层含义:

  • extends AttributeMap : 自身可以存储一些属性;
  • extends ChannelInboundInvoker:可以触发一些用户事件,包括读事件,注册事件等;
  • extends ChannelOutboundInvoker: 可以触发一些用户事件,包括写事件
    Channel channel();
    EventExecutor executor();
    String name();
    ChannelHandler handler();
    boolean isRemoved();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
  • ChannelHandlerContext本身包含获取当前的channel,获取当前的NioEventloop,当前属于哪个ChannelHandler等等;

  • 3.HeadContext 和 TailContext

  • 3.1 TailContext继承AbstractChannelHandlerContext

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
       TailContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, TAIL_NAME, true, false);
           setAddComplete();
       }
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
        //当前context的名字  
        this.name = ObjectUtil.checkNotNull(name, "name");
       //  当前context所属的pipeline
        this.pipeline = pipeline;
       //  当前context所属的NioEventLoop
        this.executor = executor;
       //  标示是inboundHandler还是outboundHandler
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

可以看出TailContext是一个inBound处理器,用于处理读事件,注册事件;

通过cas+自悬操作将当前节点设置为已经添加

final void setAddComplete() {
        for (; ; ) {
            int oldState = handlerState;
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return;
            }
        }
    }
  • 3.2 HeadContext

构造函数比TailContext多了一个unsafe属性,其余的都相同

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        //unsafe 属性实现底层数据的读写
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
}

基本实现了父类的方法,包含读写,注册,异常传播等;

  • 4.ChannelHandler的添加与删除
  • 4.1 添加channelhandler
    在业务代码中我们一般添加handler都是通过这样的方式进行添加
  ch.pipeline().addLast(new EchoServerHandler());

接下来我们看下addLast方法中都做了哪些操作?

public final ChannelPipeline addLast(ChannelHandler... handlers) {
     return addLast(null, handlers);
}
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        //一个一个添加
        for (ChannelHandler h : handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1.判断是否重复添加
            checkMultiplicity(handler);

            //2.构造一个HandlerContext,如果有同名则抛异常
            newCtx = newContext(group, filterName(name, handler), handler);

            //3.添加HandlerContext
            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            //4.如果当前线程是EventLoop,则异步触发HandlerAdded0事件,否则直接触发
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
private static void checkMultiplicity(ChannelHandler handler) {
       //判断是不是ChannelHandlerAdapter的实例
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //如果当前handler不是用Sharable注解的并且已经添加了,则直接抛异常
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                                " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            //否则,标示已经添加
            h.added = true;
        }
    }

构造一个DefaultChannelHandlerContext对象

 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

执行添加操作,就是往链表中插入一个元素

   private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

添加完成,触发该handler的一个handlerAdded事件,并设置当前handler已经添加

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
}
  • 4.2 删除channelHandler
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

拿到channelHandler节点

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }

        //从头开始遍历节点,无限for循环,如果遍历到则返回,否则返回null
        AbstractChannelHandlerContext ctx = head.next;
        for (; ; ) {
            if (ctx == null) {
                return null;
            }
            if (ctx.handler() == handler) {
                return ctx;
            }
            ctx = ctx.next;
        }
    }

移除节点,触发HandlerRemoved事件

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
      //当前节点不是头节点和尾节点,因为要保证线程安全,必须保证pipeline的结构
        assert ctx != head && ctx != tail;

        synchronized (this) {
            remove0(ctx);
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

移除节点的操作

private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

调用对应Handler的remove方法,最后标示该handler已经remove,设置remove的标示

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        try {
            try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
}

pipeline的操作就讲到这里了。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351