聊一聊 gRPC 的四种通信模式

温馨提示:本文需要结合上一篇 gRPC 文章一起食用,否则可能看不懂。

前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。

gRPC 中四种不同的通信模式分别是:

  1. 一元 RPC
  2. 服务端流 RPC
  3. 客户端流 RPC
  4. 双向流 RPC

接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。

1. 准备工作

关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:

这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";

package book;

service BookService {
  rpc addBook(Book) returns (google.protobuf.StringValue);
  rpc getBook(google.protobuf.StringValue) returns (Book);
  rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
  rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
  rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}

message Book {
  string id = 1;
  repeated string tags = 2;
  string name = 3;
  float price = 4;
  string author = 5;
}

message BookSet {
  string id = 1;
  repeated Book bookList = 3;
}

这个文件中,有一些内容我们在上篇文章中都讲过了,讲过的我就不再重复了,我说一些上篇文章没有涉及到的东西:

  1. 由于我们在这个文件中,引用了 Google 提供的 StringValue(google.protobuf.StringValue),所以这个文件上面我们首先用 import 导入相关的文件,导入之后,才可以使用。
  2. 在方法参数和返回值中出现的 stream,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
  3. message 中出现了一个上篇文章没有的关键字 repeated,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。

好了,和上篇文章相比,本文主要就是这几个地方不一样。

proto 文件写好之后,按照上篇文章介绍的方法进行编译,生成对应的代码,这里就不再重复了。

2. 一元 RPC

一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们上篇文章和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。

上面我们定义的五个方法中,addBook 和 getBook 都算是一种一元 RPC。

2.1 addBook

先来看 addBook 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }

    @Override
    public void addBook(Book request, StreamObserver<StringValue> responseObserver) {
        bookMap.put(request.getId(), request);
        responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
        responseObserver.onCompleted();
    }
}

看过上篇文章的小伙伴,我觉得这段代码应该很好理解。

客户端调用方式如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        addBook(stub);
    }

    private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                System.out.println("stringValue.getValue() = " + stringValue.getValue());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("添加完毕");
            }
        });
        countDownLatch.await();
    }
}

这里我使用了 CountDownLatch 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 onNext 方法中,我们就可以拿到服务端的返回值。

2.2 getBook

getBook 跟上面的 addBook 类似,先来看服务端代码,如下:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }

    @Override
    public void getBook(StringValue request, StreamObserver<Book> responseObserver) {
        String id = request.getValue();
        Book book = bookMap.get(id);
        if (book != null) {
            responseObserver.onNext(book);
            responseObserver.onCompleted();
        } else {
            responseObserver.onCompleted();
        }
    }
}

这个 getBook 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。

客户端调用代码如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        getBook(stub);
    }

    private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                System.out.println("book = " + book);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("查询完毕");
            }
        });
        countDownLatch.await();
    }
}

小伙伴们大概也能看出来,addBook 和 getBook 基本上操作套路是一模一样的。

3. 服务端流 RPC

前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。

上面我们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。

我们来看下服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    @Override
    public void searchBooks(StringValue request, StreamObserver<Book> responseObserver) {
        Set<String> keySet = bookMap.keySet();
        String tags = request.getValue();
        for (String key : keySet) {
            Book book = bookMap.get(key);
            int tagsCount = book.getTagsCount();
            for (int i = 0; i < tagsCount; i++) {
                String t = book.getTags(i);
                if (t.equals(tags)) {
                    responseObserver.onNext(book);
                    break;
                }
            }
        }
        responseObserver.onCompleted();
    }
}

小伙伴们看下,这段 Java 代码应该很好理解:

  1. 首先从 request 中提取客户端传来的 tags 参数。
  2. 遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过 responseObserver.onNext(book); 将这本书写回到客户端。
  3. 等所有操作都完成后,执行 responseObserver.onCompleted();,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。

我们来看看客户端的代码,如下:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        searchBook(stub);
    }
    private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                System.out.println(book);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                countDownLatch.countDown();
                System.out.println("查询完毕!");
            }
        });
        countDownLatch.await();
    }
}

客户端的代码好理解,搜索的关键字是 明清小说,每当服务端返回一次数据的时候,客户端回调的 onNext 方法就会被触发一次,当服务端之行了 responseObserver.onCompleted(); 之后,客户端的 onCompleted 方法也会被触发。

这个就是服务端流,客户端发起一个请求,服务端通过 onNext 可以多次写回数据。

4. 客户端流 RPC

客户端流则是客户端发起多个请求,服务端只给出一个响应。

上面的 updateBooks 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。

我们先来看看服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    
    @Override
    public StreamObserver<Book> updateBooks(StreamObserver<StringValue> responseObserver) {
        StringBuilder sb = new StringBuilder("更新的图书 ID 为:");
        return new StreamObserver<Book>() {
            @Override
            public void onNext(Book book) {
                bookMap.put(book.getId(), book);
                sb.append(book.getId())
                        .append(",");
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());
                responseObserver.onCompleted();
            }
        };
    }
}

客户端每发送一本书来,就会触发服务端的 onNext 方法,然后我们在这方法中进行图书的更新操作,并记录更新结果。最后,我们在 onCompleted 方法中,将更新结果汇总返回给客户端,基本上就是这样一个流程。

我们再来看看客户端的代码:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        updateBook(stub);
    }

    private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamObserver<Book> request = stub.updateBooks(new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                System.out.println("stringValue.getValue() = " + stringValue.getValue());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                System.out.println("更新完毕");
                countDownLatch.countDown();
            }
        });
        request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());
        request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());
        request.onCompleted();
        countDownLatch.await();
    }
}

在客户端这块,updateBooks 方法会返回一个 StreamObserver<Book> 对象,调用该对象的 onNext 方法就是给服务端传递数据了,可以传递多个数据,调用该对象的 onCompleted 方法就是告诉服务端数据传递结束了,此时也会触发服务端的 onCompleted 方法,服务端的 onCompleted 方法执行之后,进而触发了客户端的 onCompleted 方法。

5. 双向流 RPC

双向流其实就是 3、4 小节的合体。即客户端多次发送数据,服务端也多次响应数据。

我们先来看下服务端的代码:

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map<String, Book> bookMap = new HashMap<>();
    private List<Book> books = new ArrayList<>();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
        Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
        Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
        bookMap.put("1", b1);
        bookMap.put("2", b2);
        bookMap.put("3", b3);
    }
    @Override
    public StreamObserver<StringValue> processBooks(StreamObserver<BookSet> responseObserver) {
        return new StreamObserver<StringValue>() {
            @Override
            public void onNext(StringValue stringValue) {
                Book b = Book.newBuilder().setId(stringValue.getValue()).build();
                books.add(b);
                if (books.size() == 3) {
                    BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                    responseObserver.onNext(bookSet);
                    books.clear();
                }
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                responseObserver.onNext(bookSet);
                books.clear();
                responseObserver.onCompleted();
            }
        };
    }
}

这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,然后服务端根据这些 ID 构建对应的 Book 对象,然后三个三个一组,再返回给客户端??突Ф嗣看畏⑺鸵桓銮肭螅蓟岽シ⒎穸说?onNext 方法,我们在这个方法中对请求分组返回。最后如果还有剩余的请求,我们在 onCompleted() 方法中返回。

再来看看客户端的代码:

public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        processBook(stub);
    }

    private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamObserver<StringValue> request = stub.processBooks(new StreamObserver<BookSet>() {
            @Override
            public void onNext(BookSet bookSet) {
                System.out.println("bookSet = " + bookSet);
                System.out.println("=============");
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
                System.out.println("处理完毕!");
                countDownLatch.countDown();
            }
        });
        request.onNext(StringValue.newBuilder().setValue("a").build());
        request.onNext(StringValue.newBuilder().setValue("b").build());
        request.onNext(StringValue.newBuilder().setValue("c").build());
        request.onNext(StringValue.newBuilder().setValue("d").build());
        request.onCompleted();
        countDownLatch.await();
    }
}

这个客户端的代码跟第四小节一模一样,不再赘述了。

好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些关键代码,如果小伙伴们没看明白,建议结合上篇文章一起阅读就懂啦~

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容