一 类层次图
二 NettyCodecAdapter
- 代理DubboCountCodec
2.1 encode
- write函数发送消息,先按照preferDirect属性分配HeapByteBuf或DirectByteBuf
- NettyBackedChannelBuffer代理ByteBuf相应接口
- NettyChannel代理netty层的通信接口
- 调用协议层配置的DubboCountCodec编码接口
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = this.allocateBuffer(ctx, msg, this.preferDirect);
this.encode(ctx, cast, buf);
...
}
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
}
2.2 decode
- 调用协议层配置的DubboCountCodec解码接口
- NEED_MORE_INPUT表示报文拆分传输,需要继续等待通信层收包,回退读索引。
- 解码成功,输出解码的对象。
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
msg = codec.decode(channel, message);
} catch (IOException e) {
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
三 DubboCountCodec
- 代理DubboCodec
3.1 encode
- 编码无特殊处理,直接调用DubboCodec编码接口
3.2 decode
- 收包缓存中存在多个消息,分别解码并存储
- Requst包,记录input为包长。Response包记录output为包长。统计使用。
四 其他编解码层
- 前面两层是代理关系,后面各层是继承关系。
-
对dubbo协议头的编解码
- 对Request/Response消息体编解码
- 对RpcInvocation/RpcResult接口调用信息编解码。
4.1 encode
4.1.1 encode流程图
4.1.2 ExchangeCodec.encode
4.1.2.1 encodeRequest
- 根据serialization配置获取序列化类型,默认hessian2序列化
- 协议头两字节magic
short MAGIC = (short) 0xdabb;
,标识是dubbo协议报文。 - 协议第三字节,请求报文标记,序列化类型,是否双工报文,是否事件报文(如心跳报文)
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
- 协议第5-12字节,请求id
- 协议第13-15字节,请求报文长度
- 在序列化对象中编码RpcInvocation请求体
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
4.1.2.2 encodeResponse
- 根据serialization配置获取序列化类型,默认hessian2序列化
- 协议头两字节magic
short MAGIC = (short) 0xdabb;
,标识是dubbo协议报文。 - 协议第三字节,序列化类型,是否事件报文(如心跳报文)
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
- 协议第四字节,响应状态,只在reponse报文中使用。
- 协议第5-12字节,响应对应的请求报文id
- 协议第13-15字节,请求报文长度
- 在序列化对象中编码RpcResult请求体
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
4.1.3 DubboCodec.encode
4.1.3.1 encodeRequestData
- 写入dubbo协议版本
- 接口路径
- 接口版本
- 方法名
- 方法参数类型
- 方法参数
- 上下文参数,附加属性
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.writeObject(inv.getAttachments());
}
4.1.3.2 encodeResponseData
- 第一个字节写入返回值类型
值 | 类型 |
---|---|
0 | 返回异常 |
1 | 返回值 |
2 | 无返回值 |
- 写入异?;蚍祷刂?。
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result = (Result) data;
Throwable th = result.getException();
if (th == null) {
Object ret = result.getValue();
if (ret == null) {
out.writeByte(RESPONSE_NULL_VALUE);
} else {
out.writeByte(RESPONSE_VALUE);
out.writeObject(ret);
}
} else {
out.writeByte(RESPONSE_WITH_EXCEPTION);
out.writeObject(th);
}
}
4.1.4 TelnetCodec.encode
编码无特殊处理
4.2 decode
4.2.1 decode流程图
4.2.2 ExchangeCodec.decode
- 校验消息头两字节,是否为MAGIC数字,不是则走telnetCodec处理
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i ++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
- 校验长度,完整协议头长度,消息体长度不超过8m,完整头+体总长度。
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
if( readable < tt ) {
return DecodeResult.NEED_MORE_INPUT;
}
- dubboCodec.decode调用
4.2.3 DubboCodec.decode
- 根据消息头获取序列化类型,创建序列化对象
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
- 解析消息头,创建Response对象。
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
}
- 解析消息头,创建Request对象。
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
- 反序列化消息体,得到RpcInvocation或RpcResult或事件对象。
4.2.4 TelnetCodec.decode
- 长度校验
checkPayload(channel, readable);
if (message == null || message.length == 0) {
return DecodeResult.NEED_MORE_INPUT;
}
- 回退命令,删除末尾输入后返回给客户端
if (message[message.length - 1] == '\b') { // Windows backspace echo
try {
boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
channel.send(new String(doublechar ? new byte[] {32, 32, 8, 8} : new byte[] {32, 8}, getCharset(channel).name()));
} catch (RemotingException e) {
throw new IOException(StringUtils.toString(e));
}
return DecodeResult.NEED_MORE_INPUT;
}
- 退出命令,关闭channel连接
for (Object command : EXIT) {
if (isEquals(message, (byte[]) command)) {
if (logger.isInfoEnabled()) {
logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[])command)));
}
channel.close();
return null;
}
}
- 历史命令选择.
HISTORY_LIST_KEY存储历史命令,最多10条
HISTORY_INDEX_KEY 上次滚动到的历史命令索引,本轮历史命令选择周期内有效。选中命令执行后清除。
boolean up = endsWith(message, UP);
boolean down = endsWith(message, DOWN);
if (up || down) {
LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
if (history == null || history.size() == 0) {
return DecodeResult.NEED_MORE_INPUT;
}
Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
...
}
- 解析返回telent命令
String result = toString(message, getCharset(channel));
- 命令功能
命令 | 实现类 |
---|---|
clear | com.alibaba.dubbo.remoting.telnet.support.command.ClearTelnetHandler |
exit | com.alibaba.dubbo.remoting.telnet.support.command.ExitTelnetHandler |
help | com.alibaba.dubbo.remoting.telnet.support.command.HelpTelnetHandler |
status | com.alibaba.dubbo.remoting.telnet.support.command.StatusTelnetHandler |
log | com.alibaba.dubbo.remoting.telnet.support.command.LogTelnetHandler |
ls | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ListTelnetHandler |
ps | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.PortTelnetHandler |
cd | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.ChangeTelnetHandler |
pwd | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CurrentTelnetHandler |
invoke | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.InvokeTelnetHandler |
trace | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.TraceTelnetHandler |
count | com.alibaba.dubbo.rpc.protocol.dubbo.telnet.CountTelnetHandler |