gRPC入门-双向流式通信

Gradle工程的环境搭建依然和前文一致,参考:gRPC入门-Hello World

示例代码

  • src\main\proto下新建.proto契约,如下:

    syntax = "proto3";
    
    package com.mattie.netty.grpc;
    
    option java_package = "com.mattie.netty.grpc";
    option java_outer_classname = "HelloWorldProtos";
    
    service HelloService {
        //双向流
        rpc biStream (stream StreamReq) returns (stream StreamResp) {};
    }
    
    //双向流
     message StreamReq {
       string req_info = 1;
    }
    
    message StreamResp {
        string resp_info = 1;
    }
    
  • 执行gradle clean build,自动生成gRPC相关代码: HelloServiceGrpc, 我们需要使用就是它和它的几个内部类。

  • 自定义服务类MyServiceImpl继承HelloServiceGrpc的内部类HelloServiceImplBase并重写契约中定义的服务biStream。注意到重写的方法接收的参数类型是StreamObserver(针对response),返回值的类型也是StreamObserver(针对request)。

    public class MyServiceImpl extends HelloServiceImplBase {
        @Override
        public StreamObserver<HelloWorldProtos.StreamReq> biStream(StreamObserver<HelloWorldProtos.StreamResp> responseObserver) {
      return new StreamObserver<HelloWorldProtos.StreamReq>() {
    
          //接收请求
          @Override
          public void onNext(HelloWorldProtos.StreamReq streamReq) {
              System.out.println(streamReq.getReqInfo());
              //接收请求后就返回一个响应
              responseObserver.onNext(HelloWorldProtos.StreamResp.newBuilder().setRespInfo("from server").build());
          }
    
          @Override
          public void onError(Throwable throwable) {
    
          }
    
          //客户端发送数据完毕
          @Override
          public void onCompleted() {
              //服务端也完毕
              responseObserver.onCompleted();
          }
        };
      }
    

RequestObserver在服务端实现,responseObserver在客户端实现。

因此,在服务代码MyServiceImpl中需要返回一个requestObserver,实现其中的回调方法:
1. onNext表示接收到一个request,这里通过responseObserveronNext()立刻返回了一条数据。
2. onCompleted表示客户端已经发送数据完毕,这里调用responseObserveronCompleted()也告诉客户端连接关闭。

  • 客户端不仅需要发送数据,而且需要实现一个responseObserver:

      public void biCommute() {
          StreamObserver<HelloWorldProtos.StreamReq> reqStreamObserver = this.stub.biStream(new StreamObserver<HelloWorldProtos.StreamResp>() {
              //接收到服务返回
              @Override
              public void onNext(HelloWorldProtos.StreamResp streamResp) {
                  System.out.println(streamResp.getRespInfo());
              }
    
              @Override
              public void onError(Throwable throwable) {
    
              }
    
              //服务发送完毕
              @Override
              public void onCompleted() {
    
              }
          });
    
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server1").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server2").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server3").build());
          reqStreamObserver.onNext(HelloWorldProtos.StreamReq.newBuilder().setReqInfo("hello server4").build());
          reqStreamObserver.onCompleted();
    
         try {
            Thread.sleep(10000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
       }
    
  1. 这里调用服务方法biStream使用的是stub(异步),而非之前的blockingStub(同步)。
  2. 调用biStream需要传入responseObserver作为参数,同时返回值是requestObserver
  3. 通过requestObserveronNext()不断发送数据。
  • 执行程序,客户端发送四条数据,服务端每接收到一条数据就响应一条from server给客户端。

为了能更好的看到执行效果,可以在程序最后增加sleep(10000),观察服务的异步响应过程。

双向流
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容

  • 本文是gRPC的一个简单例子,以protocol buffers 3作为契约类型,使用gRPC自动生成服务端和客户...
    ted005阅读 5,241评论 0 50
  • 版权声明:本文为小斑马伟原创文章,转载请注明出处! 上篇简单的阐述了响应式编程的基本理论。这篇主要对响应编程进行详...
    ZebraWei阅读 2,285评论 0 2
  • 上篇文章讲到 Protocol Buffers. 一些 API 也可以用 Protobuf 来表示。 简单介绍 R...
    loono阅读 20,272评论 0 4
  • 一.Grpc简介 一个2016年才由google正式发布的的RPC框架,基于http2,protobuf协议 官网...
    我也是玄冲阅读 8,669评论 0 2
  • 如果你爱我 请你爱我之前先爱你自己 爱我的同时也爱着你自己 你若不爱你自己 你便无法来爱我 这是爱的法则 因为 你...
    清且安阅读 229评论 0 1