netty实战一之TCP粘包问题和不同编解码技术的应用

TCP粘包拆包问题

  1. LineBasedFrameDecoder+StringDecoder 换行符为结束标志
  2. DelimiterBasedFrameDecoder + StringDecoder 分隔符作为结束标志
  3. FixedLengthFrameDecoder + StringDecoder 定长

代码示例(netty权威指南修改的例子,不多解释)

EchoServerHandler.class

public class EchoServerHandler extends ChannelInboundHandlerAdapter{
    int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        String body = (String) msg;
//        System.out.println("This is " + ++counter + " times received client:[" + body + "]");
//        body += "$_";
//        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
//        ctx.writeAndFlush(echo);
        System.out.println("Received client : [" + msg + "]");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoServer.class

public class EchoServer {
    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(20));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
//                e.printStackTrace();
            }
        }
        new EchoServer().bind(port);
    }
}

EchoClientHandler.class

public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    static final String ECHO_REQ = "Hi,welcome to netty.$_";

    public EchoClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for( int i = 0 ; i < 10 ; i++){
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("This is " + ++counter + " times received client:[" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

EchoClient.class

public class EchoClient {
    public void connect(int port,String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        try {
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture future = b.connect(host,port).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
//                e.printStackTrace();
            }
        }
        new EchoClient().connect(port,"127.0.0.1");
    }
}

编解码技术

Java序列化

  1. 无法跨语言
  2. 序列化后流太大
  3. 序列化性能太低

Google的Protobuf

  1. 语言无关
  2. 高效
  3. 结构化数据存储格式(XML JSON等)
  4. 官方支持Java C++ Python

Facebook的Thrift

  • 优缺点
  1. 适用于静态的数据交换,需要首先确定好它的数据结构;当数据结构发生变化时,必须重新编辑IDL文件,生成代码和编译
  2. 支持多种程序语言
  • 组成
  1. 语言系统以及IDL编译器
  2. TProtocal:RPC协议层,可以选择多种不同的对象序列化方式
  3. TTransport:RPC的传输层
  4. TProcesser:协议层和用户服务之间的纽带,负责调用服务实现的接口
  5. TServer:聚合TProtocal、TTransport、TProcesser等对象

netty使用不同解码方式

在netty中使用java序列化

public class SubscribeReq implements Serializable {
    private static final long serialVersionUID = 2394470753940199464L;
    private int subReqId;
    private String userName;
    private String productName;
    private String phoneNumber;
    private String address;

    public int getSubReqId() {
        return subReqId;
    }

    public void setSubReqId(int subReqId) {
        this.subReqId = subReqId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public String getPhoneNumber() {
        return phoneNumber;
    }

    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SubscribeReq that = (SubscribeReq) o;

        if (subReqId != that.subReqId) return false;
        if (userName != null ? !userName.equals(that.userName) : that.userName != null) return false;
        if (productName != null ? !productName.equals(that.productName) : that.productName != null) return false;
        if (phoneNumber != null ? !phoneNumber.equals(that.phoneNumber) : that.phoneNumber != null) return false;
        return address != null ? address.equals(that.address) : that.address == null;

    }

    @Override
    public int hashCode() {
        int result = subReqId;
        result = 31 * result + (userName != null ? userName.hashCode() : 0);
        result = 31 * result + (productName != null ? productName.hashCode() : 0);
        result = 31 * result + (phoneNumber != null ? phoneNumber.hashCode() : 0);
        result = 31 * result + (address != null ? address.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return "SubscribeReq{" +
                "subReqId=" + subReqId +
                ", userName='" + userName + '\'' +
                ", productName='" + productName + '\'' +
                ", phoneNumber='" + phoneNumber + '\'' +
                ", address='" + address + '\'' +
                '}';
    }
}
public class SubscribeResp implements Serializable {
    private static final long serialVersionUID = 1L;
    private int subReqId;
    private int respCode;
    private String desc;

    public int getSubReqId() {
        return subReqId;
    }

    public void setSubReqId(int subReqId) {
        this.subReqId = subReqId;
    }

    public int getRespCode() {
        return respCode;
    }

    public void setRespCode(int respCode) {
        this.respCode = respCode;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SubscribeResp that = (SubscribeResp) o;

        if (subReqId != that.subReqId) return false;
        if (respCode != that.respCode) return false;
        return desc != null ? desc.equals(that.desc) : that.desc == null;

    }

    @Override
    public int hashCode() {
        int result = subReqId;
        result = 31 * result + respCode;
        result = 31 * result + (desc != null ? desc.hashCode() : 0);
        return result;
    }

    @Override
    public String toString() {
        return "SubscribeResp{" +
                "subReqId=" + subReqId +
                ", respCode=" + respCode +
                ", desc='" + desc + '\'' +
                '}';
    }
}

在netty中使用protobuf

  1. 安装protobuf

下载解压编译 make && make check && sudo make install && sudo ldconfig

  1. 编写proto文件

    syntax="proto3";
    option java_package = "com.oneapm.netty.opendecode.protobuf";
    option java_outer_classname = "SubscribeReqProto";
    
    message SubscribeReq {
        int32 subReqId = 1;
        string userName = 2;
        string productName = 3;
        repeated string address = 4;
    }
    
    
    syntax="proto3";
    option java_package = "com.oneapm.netty.opendecode.protobuf";
    option java_outer_classname = "SubscribeRespProto";
    
    message SubscribeResp {
        int32 subReqId = 1;
         int32 respCode = 2;
        string desc = 3;
    }
    
  2. 自动生成class文件
    proto文件放在与/src/main/java平齐的/src/main/protobuf下

cd src/main/protobuf
/usr/local/bin/protoc --java_out=../java/ SubscribeReq.proto
/usr/local/bin/protoc --java_out=../java/ SubscribeResp.proto

  1. 在server和client增加编码解码

通用代码

server端

public class SubReqServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        SubscribeReq req = (SubscribeReq) msg;
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
        if ("oneapm".equalsIgnoreCase(req.getUserName())) {
            System.out.println("Service accept client subscribe req:[" + req.toString() + "]");
            ctx.writeAndFlush(resp(req.getSubReqId()));
        }
    }

    /**
     * java serializable
     * @param subReqId
     * @return
     */
//    private SubscribeResp resp(int subReqId) {
//        SubscribeResp resp = new SubscribeResp();
//        resp.setSubReqId(subReqId);
//        resp.setRespCode(0);
//        resp.setDesc("Welcome to China");
//        return resp;
//    }

    /**
     * protobuf
     * @param subReqId
     * @return
     */
    private SubscribeRespProto.SubscribeResp resp(int subReqId) {
        SubscribeRespProto.SubscribeResp.Builder resp = SubscribeRespProto.SubscribeResp.newBuilder();
        resp.setSubReqId(subReqId);
        resp.setRespCode(0);
        resp.setDesc("Welcome to China");
        return resp.build();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
public class SubReqServer {
    public void bind(int port) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //java Serializable
//                            socketChannel.pipeline().addLast(new ObjectDecoder(1024*1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
//                            socketChannel.pipeline().addLast(new ObjectEncoder());
//                            socketChannel.pipeline().addLast(new SubReqServerHandler());

                            //protobuf
                            socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                            socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            socketChannel.pipeline().addLast(new ProtobufEncoder());
                            socketChannel.pipeline().addLast(new SubReqServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
//                e.printStackTrace();
            }
        }
        new SubReqServer().bind(port);
    }
}

client端

public class SubReqClientHandler extends ChannelInboundHandlerAdapter {
    public SubReqClientHandler() {
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for( int i = 0 ; i < 10 ; i++){
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

//    private SubscribeReq subReq(int i){
//        SubscribeReq req = new SubscribeReq();
//        req.setAddress("宜昌市三峡大坝");
//        req.setProductName("netty深入学习");
//        req.setPhoneNumber("185********");
//        req.setSubReqId(i);
//        req.setUserName("oneapm");
//        return req;
//    }

    /**
     * protobuf
     * @param i
     * @return
     */
    private SubscribeReqProto.SubscribeReq subReq(int i){
        SubscribeReqProto.SubscribeReq.Builder req = SubscribeReqProto.SubscribeReq.newBuilder();
        List<String> address = new ArrayList<>();
        address.add("NanJing");
        address.add("BeiJing");
        req.addAllAddress(address);
        req.setProductName("netty study");
        req.setSubReqId(i);
        req.setUserName("oneapm");
        return req.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Receive server response: [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
public class SubReqClient {
    public void connect(int port,String host) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        try {
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
//                            socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
//                            socketChannel.pipeline().addLast(new ObjectEncoder());
//                            socketChannel.pipeline().addLast(new SubReqClientHandler());
                            socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                            socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            socketChannel.pipeline().addLast(new ProtobufEncoder());
                            socketChannel.pipeline().addLast(new SubReqClientHandler());
                        }
                    });
            ChannelFuture future = b.connect(host,port).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 8080;
        if(args != null && args.length > 0){
            try {
                port = Integer.parseInt(args[0]);
            } catch (NumberFormatException e) {
//                e.printStackTrace();
            }
        }
        new SubReqClient().connect(port,"127.0.0.1");
    }

}

facebook/nifty:Thrift on Netty

facebook提供的构建在Thrift和Netty之上的封装https://github.com/facebook/nifty,简单来说,就是让我们的编码更加简单(脑残)

项目github地址

https://github.com/engimatic/effectivejava/tree/master/netty/src/main/java/com/oneapm/netty

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,848评论 0 13
  • 随着微服务架构的流行,RPC框架渐渐地成为服务框架的一个重要部分。在很多RPC的设计中,都采用了高性能的编解码技术...
    ginobefun阅读 5,640评论 2 2
  • 由于工程项目中拟采用一种简便高效的数据交换格式,百度了一下发现除了采用 xml、JSON 还有 ProtoBuf(...
    黄海佳阅读 48,608评论 1 23
  • 说明 系列文章:http://08643.cn/p/594441fb9c9e 本文完全参考自《Net...
    被称为L的男人阅读 857评论 1 1
  • 1. RPC 1.1 简介 RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损...
    wy_sure阅读 6,986评论 0 1