soul网关学习18-插件实现2-HttpClient插件集-http请求的转发与结果处理

在前面两篇我们分析了Divide插件的处理,soul网关对整个http请求,Divide插件还只是其中一部分,其主要功能只是完成了后端节点的选取。至于如何将请求再转发到后端节点,则就是我们今天要分析的插件HttpClient集去实现的。

http请求的处理流图

http-request-process

我们分析一下这个图,这个图表示了最简版的soul网关处理http请求的流程

  1. http请求进来,经由Divide插件处理,其主要逻辑就是通过负载均衡算法选取出后端服务节点来处理这次的请求,实际上就是拿到了一个后端服务的httpUrl,并将httpUrl塞进exchange对象
  2. 接下来HttpClient插件处理,该插件则会向httpUrl发起请求,在拿到返回结果webHandlerClientResponse后,并将其塞进exchange对象
  3. 接下来WebClientResponse插件会取出返回结果webHandlerClientResponse,将其作为最终的返回结果输出给到客户端

分析

  1. 先从configuration入手,找到HttpClient插件的配置类HttpClientPluginConfiguration,会有几个关键的beanHttpClientWebClientConfigurationNettyHttpClientConfiguration
  2. 先看HttpClient,涉及到httpClient的实例化,包括连接池(禁用、定长、弹性)、连接超时(默认45s)、是否走代理(代理服务器、用户名、密码;)、https
    public HttpClient httpClient(final HttpClientProperties properties) {
        // configure pool resources
        // httpClient的初始化,包括连接池(禁用、定长、弹性)、连接超时(默认45s)、
        // 是否走代理(代理服务器、用户名、密码;)、https
        // 连接池
        HttpClientProperties.Pool pool = properties.getPool();
        ConnectionProvider connectionProvider;
        // 省略。。。
        HttpClient httpClient = HttpClient.create(connectionProvider)
                .tcpConfiguration(tcpClient -> {
                    // 连接超时
                    if (properties.getConnectTimeout() != null) {
                        // 省略。。。
                    }
                    // configure proxy if proxy host is set.
                    // 是否走代理
                    HttpClientProperties.Proxy proxy = properties.getProxy();
                   // 省略。。。
                });
        // ssl
        HttpClientProperties.Ssl ssl = properties.getSsl();
        // 省略。。。
        return httpClient;
    }
  • WebClientConfiguration则包含了 WebClientPluginWebClientResponsePlugin;而NettyHttpClientConfiguration则包含了NettyHttpClientPluginNettyClientResponsePlugin
  • 他们都是为了实现http 后端服务的调用,以及返回结果的处理,是一个二选一的事情,由配置项soul.httpclient.strategy决定,其中WebClientConfiguration为默认
  • 我们具体分析下这两种方式

WebClientConfiguration

WebClientPlugin

  • WebClientPlugin实例化,需传入WebClient
        @Bean
        public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
            // 构造webClient,是一个非阻塞、用于执行http请求的响应式的客户端,其底层是Reactor Netty
            // 类似restTemplate,我们使用时也需要进行构造
            WebClient webClient = WebClient.builder()
                    // 客户端连接器,传入已预先配置好的httpClient,也就是我们在上面实例化的bean httpClient
                    .clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
                    .build();
            return new WebClientPlugin(webClient);
        }
  • WebClientPlugin处理逻辑,关键方法executehandleRequestBody
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 取出网关上下文
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        // 请求路径
        String urlPath = exchange.getAttribute(Constants.HTTP_URL);
        if (StringUtils.isEmpty(urlPath)) {
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
        log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
        // 原始请求的方法类型
        HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
        // 请求体的构造
        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
        // 处理请求体以及发送请求
        return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
    }
   
    private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
        return requestBodySpec
                // 请求头
                .headers(httpHeaders -> {
                    httpHeaders.addAll(exchange.getRequest().getHeaders());
                    httpHeaders.remove(HttpHeaders.HOST);
                })
                // 请求内容的类型
                .contentType(buildMediaType(exchange))
                // 请求body
                .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
                // 执行请求
                .exchange()
                // 请求异常时的处理
                .doOnError(e -> log.error(e.getMessage()))
                // 超时异常的抛出
                .timeout(Duration.ofMillis(timeout))
                // 重试:只有当请求发生连接超时,重试次数,重试算法使用2的指数退让,第一次重试等待200ms,期间最大间隔为20s
                .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                        .retryMax(retryTimes)
                        .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                // 将flux -> mono返回
                .flatMap(e -> doNext(e, exchange, chain));

    }

WebClientResponsePlugin

  • 看下其处理逻辑excute方法
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 先执行插件链
        return chain.execute(exchange)
                // then表示插件链处理完成后才执行的逻辑,then里边的逻辑也是异步处理的
                .then(Mono.defer(() -> {
            ServerHttpResponse response = exchange.getResponse();
            // 后端服务返回的响应
            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
            // 一些服务异常的处理
            if (Objects.isNull(clientResponse)
                    || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                    || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            // 后端服务返回的状态码 cookie header的处理
            response.setStatusCode(clientResponse.statusCode());
            response.getCookies().putAll(clientResponse.cookies());
            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
            // 最后将响应体写入到客户端响应中
            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
        }));
    }

小结

  • WebClientPlugin底层直接用spring官方实现的webClient去完成后端服务的请求,请求之前需要将客户端请求过来的请求头header、请求方法httpMethod、请求内容类型contentType、请求体requestBody一并发送给后端服务;执行请求后会将后端服务返回的响应放于exchange中,交由插件WebClientResponsePlugin处理;最后继续执行插件链
  • WebClientResponsePlugin是先执行插件链,待插件链上的所有插件全部执行完毕后才执行自己的逻辑:处理客户端原始请求的响应,即将由后端服务返回的响应clientResponse、状态码、cookie、header设置到其内容之中

NettyHttpClientConfiguration

NettyHttpClientPlugin

  • NettyHttpClientPlugin实例化,传入httpClient即可
        @Bean
        public SoulPlugin nettyHttpClientPlugin(final ObjectProvider<HttpClient> httpClient) {
            return new NettyHttpClientPlugin(httpClient.getIfAvailable());
        }
  • 看下插件的execute方法,只保留关键逻辑
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 后端服务请求方法、请求头的生成
        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        HttpHeaders filtered = request.getHeaders();
        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);
        String url = exchange.getAttribute(Constants.HTTP_URL);
        // 。。。
        Flux<HttpClientResponse> responseFlux =
                // 设置请求头
                this.httpClient.headers(headers -> headers.add(httpHeaders))
                // 构造requestSender
                .request(method).uri(url)
                // 发送请求 nettyOutBound出站的一个处理,即发送请求前的一个处理
                // 需要将请求体的内容转换为netty的ByteBuf传输出去
                .send((req, nettyOutbound) -> nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())))
                // 将connection提取出Flux<HttpClientResponse>,
                .responseConnection((res, connection) -> {
                    // 将后端服务返回的响应结果放到exchange中
                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
                    // 将connection对象放到exchange中,需要传递给NettyClientResponsePlugin处理
                    exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection);
                    // 处理header、cookie、httpStatus
                    ServerHttpResponse response = exchange.getResponse();
                    HttpHeaders headers = new HttpHeaders();
                    res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
                    String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
                    if (StringUtils.hasLength(contentTypeValue)) {
                        exchange.getAttributes().put(Constants.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
                    }
                    HttpStatus status = HttpStatus.resolve(res.status().code());
                    if (status != null) {
                        response.setStatusCode(status);
                    } else if (response instanceof AbstractServerHttpResponse) {
                        ((AbstractServerHttpResponse) response)
                                .setStatusCodeValue(res.status().code());
                    } else {
                        throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());
                    }
                    response.getHeaders().putAll(headers);

                    return Mono.just(res);
                });
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        Duration duration = Duration.ofMillis(timeout);
        responseFlux = responseFlux
                // 超时
                .timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
                // 异常的映射
                .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
        // 继续执行插件链
        return responseFlux.then(chain.execute(exchange));
    }

NettyClientResponsePlugin

  • 直接看插件的execute方法
    public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        // 先执行插件自身的逻辑,为异步执行
        return Mono.defer(() -> {
            Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
            if (connection == null) {
                return Mono.empty();
            }
            if (log.isTraceEnabled()) {
                log.trace("NettyWriteResponseFilter start inbound: "
                        + connection.channel().id().asShortText() + ", outbound: "
                        + exchange.getLogPrefix());
            }
            ServerHttpResponse response = exchange.getResponse();
            NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
            // NettyDataBuffer的响应
            final Flux<NettyDataBuffer> body = connection
                    // 对于client端的connection对象,你响应的处理反而是inbound;
                    // 必须先配置inbound作为connection的桥梁,否则是无法接收数据的
                    .inbound()
                    // 接收:转换数据
                    .receive()
                    // 将内存buffers保留进行复用
                    .retain()
                    // 将ByteBuf转换为spring中的NettyDataBuffer
                    .map(factory::wrap);
            MediaType contentType = response.getHeaders().getContentType();
            // 是否为流媒体响应,若是直接返回
            return isStreamingMediaType(contentType)
                    ? response.writeAndFlushWith(body.map(Flux::just))
                    : response.writeWith(body);

        })
                // 执行插件
                .then(chain.execute(exchange)
                        //TODO question 成功的情况下怎么没有释放connection
                        .doOnError(throwable -> cleanup(exchange))).doOnCancel(() -> cleanup(exchange));
    }

小结

  • NettyHttpClientPlugin直接使用httpClient做为发送请求的载体,先构造requestSender(请求方法、请求路径、请求头),然后调用其send方法获取到responseReceiver,再通过responseConnection提取出Flux<HttpClientResponse>,其中对于响应中的(headercookie、httpStatus)这些就直接在这个提取逻辑中处理,而后端服务返回的响应webHandlerClientResponse,还有整个connection对象则放到exchange
  • NettyClientResponsePlugin主要逻辑就是处理后端服务返回的响应。有些奇怪的是,这里并没有直接使用NettyHttpClientPlugin中放到exchange对象的webHandlerClientResponse,而是从connection对象中读取响应数据;随后再执行插件链逻辑

总结

  • 上述基本将远程客户端的http请求转发到后端服务,以及将后端服务的响应再返回给远程客户端的代码实现进行了分析
  • soul网关提供了webClientnettyClient两套实现的机制,可供选择
  • 其中webClient方式是直接使用spring自己封装的WebClient(是在HttpClient之上封装实现的一套非阻塞,响应式的http客户端)作为请求的载体,整个代码更简洁明了;而 nettyClient 则是用更底层的HttpClient 去做http请求操作,需自行处理netty的出站outbound与入站inbound逻辑,整个代码会更复杂一些。
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容