dubbo源码7-编解码

一 类层次图

编解码类图.png

二 NettyCodecAdapter

  • 代理DubboCountCodec

2.1 encode

  • write函数发送消息,先按照preferDirect属性分配HeapByteBuf或DirectByteBuf
  • NettyBackedChannelBuffer代理ByteBuf相应接口
  • NettyChannel代理netty层的通信接口
  • 调用协议层配置的DubboCountCodec编码接口
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
      ByteBuf buf = this.allocateBuffer(ctx, msg, this.preferDirect);
      this.encode(ctx, cast, buf);
      ...
    }

  protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
    Channel ch = ctx.channel();
    NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
    try {
        codec.encode(channel, buffer, msg);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ch);
    }
  }

2.2 decode

  • 调用协议层配置的DubboCountCodec解码接口
  • NEED_MORE_INPUT表示报文拆分传输,需要继续等待通信层收包,回退读索引。
  • 解码成功,输出解码的对象。
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }

三 DubboCountCodec

  • 代理DubboCodec

3.1 encode

  • 编码无特殊处理,直接调用DubboCodec编码接口

3.2 decode

  • 收包缓存中存在多个消息,分别解码并存储
  • Requst包,记录input为包长。Response包记录output为包长。统计使用。

四 其他编解码层

  • 前面两层是代理关系,后面各层是继承关系。
  • 对dubbo协议头的编解码


    协议头.png
  • 对Request/Response消息体编解码
  • 对RpcInvocation/RpcResult接口调用信息编解码。

4.1 encode

4.1.1 encode流程图

encode流程图.png

4.1.2 ExchangeCodec.encode

4.1.2.1 encodeRequest

  • 根据serialization配置获取序列化类型,默认hessian2序列化
  • 协议头两字节magic short MAGIC = (short) 0xdabb;,标识是dubbo协议报文。
  • 协议第三字节,请求报文标记,序列化类型,是否双工报文,是否事件报文(如心跳报文)
// set request and serialization flag.
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
        if (req.isEvent()) header[2] |= FLAG_EVENT;
  • 协议第5-12字节,请求id
  • 协议第13-15字节,请求报文长度
  • 在序列化对象中编码RpcInvocation请求体
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

4.1.2.2 encodeResponse

  • 根据serialization配置获取序列化类型,默认hessian2序列化
  • 协议头两字节magic short MAGIC = (short) 0xdabb;,标识是dubbo协议报文。
  • 协议第三字节,序列化类型,是否事件报文(如心跳报文)
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
  • 协议第四字节,响应状态,只在reponse报文中使用。
  • 协议第5-12字节,响应对应的请求报文id
  • 协议第13-15字节,请求报文长度
  • 在序列化对象中编码RpcResult请求体
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

4.1.3 DubboCodec.encode

4.1.3.1 encodeRequestData

  • 写入dubbo协议版本
  • 接口路径
  • 接口版本
  • 方法名
  • 方法参数类型
  • 方法参数
  • 上下文参数,附加属性
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        out.writeObject(inv.getAttachments());
    }

4.1.3.2 encodeResponseData

  • 第一个字节写入返回值类型
类型
0 返回异常
1 返回值
2 无返回值
  • 写入异?;蚍祷刂?。
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result) data;

        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte(RESPONSE_NULL_VALUE);
            } else {
                out.writeByte(RESPONSE_VALUE);
                out.writeObject(ret);
            }
        } else {
            out.writeByte(RESPONSE_WITH_EXCEPTION);
            out.writeObject(th);
        }
    }

4.1.4 TelnetCodec.encode

编码无特殊处理

4.2 decode

4.2.1 decode流程图

decode流程图.png

4.2.2 ExchangeCodec.decode

  • 校验消息头两字节,是否为MAGIC数字,不是则走telnetCodec处理
if (readable > 0 && header[0] != MAGIC_HIGH 
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i ++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
  • 校验长度,完整协议头长度,消息体长度不超过8m,完整头+体总长度。
// check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        if( readable < tt ) {
            return DecodeResult.NEED_MORE_INPUT;
        }
  • dubboCodec.decode调用

4.2.3 DubboCodec.decode

  • 根据消息头获取序列化类型,创建序列化对象
 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
  • 解析消息头,创建Response对象。
if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
}
  • 解析消息头,创建Request对象。
Request req = new Request(id);
            req.setVersion("2.0.0");
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
  • 反序列化消息体,得到RpcInvocation或RpcResult或事件对象。

4.2.4 TelnetCodec.decode

  • 长度校验
checkPayload(channel, readable);
        if (message == null || message.length == 0) {
            return DecodeResult.NEED_MORE_INPUT;
        }
  • 回退命令,删除末尾输入后返回给客户端
if (message[message.length - 1] == '\b') { // Windows backspace echo
            try {
                boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
                channel.send(new String(doublechar ? new byte[] {32, 32, 8, 8} : new byte[] {32, 8}, getCharset(channel).name()));
            } catch (RemotingException e) {
                throw new IOException(StringUtils.toString(e));
            }
            return DecodeResult.NEED_MORE_INPUT;
        }
  • 退出命令,关闭channel连接
for (Object command : EXIT) {
            if (isEquals(message, (byte[]) command)) {
                if (logger.isInfoEnabled()) {
                    logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[])command)));
                }
                channel.close();
                return null;
            }
        }
  • 历史命令选择.
    HISTORY_LIST_KEY存储历史命令,最多10条
    HISTORY_INDEX_KEY 上次滚动到的历史命令索引,本轮历史命令选择周期内有效。选中命令执行后清除。
boolean up = endsWith(message, UP);
        boolean down = endsWith(message, DOWN);
        if (up || down) {
            LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
            if (history == null || history.size() == 0) {
                return DecodeResult.NEED_MORE_INPUT;
            }
            Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
...
}
  • 解析返回telent命令
    String result = toString(message, getCharset(channel));
  • 命令功能
命令 实现类
clear com.alibaba.dubbo.remoting.telnet.support.command.ClearTelnetHandler
exit com.alibaba.dubbo.remoting.telnet.support.command.ExitTelnetHandler
help com.alibaba.dubbo.remoting.telnet.support.command.HelpTelnetHandler
status com.alibaba.dubbo.remoting.telnet.support.command.StatusTelnetHandler
log com.alibaba.dubbo.remoting.telnet.support.command.LogTelnetHandler
ls com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ListTelnetHandler
ps com.alibaba.dubbo.rpc.protocol.dubbo.telnet.PortTelnetHandler
cd com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ChangeTelnetHandler
pwd com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CurrentTelnetHandler
invoke com.alibaba.dubbo.rpc.protocol.dubbo.telnet.InvokeTelnetHandler
trace com.alibaba.dubbo.rpc.protocol.dubbo.telnet.TraceTelnetHandler
count com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CountTelnetHandler
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351