在前面两篇我们分析了Divide
插件的处理,soul
网关对整个http
请求,Divide
插件还只是其中一部分,其主要功能只是完成了后端节点
的选取。至于如何将请求再转发到后端节点
,则就是我们今天要分析的插件HttpClient
集去实现的。
http请求的处理流图
我们分析一下这个图,这个图表示了最简版的soul
网关处理http
请求的流程
-
http
请求进来,经由Divide
插件处理,其主要逻辑就是通过负载均衡算法选取出后端服务节点来处理这次的请求,实际上就是拿到了一个后端服务的httpUrl
,并将httpUrl
塞进exchange
对象 - 接下来
HttpClient
插件处理,该插件则会向httpUrl
发起请求,在拿到返回结果webHandlerClientResponse
后,并将其塞进exchange
对象 - 接下来
WebClientResponse
插件会取出返回结果webHandlerClientResponse
,将其作为最终的返回结果输出给到客户端
分析
- 先从
configuration
入手,找到HttpClient
插件的配置类HttpClientPluginConfiguration
,会有几个关键的bean
:HttpClient
、WebClientConfiguration
和NettyHttpClientConfiguration
- 先看
HttpClient
,涉及到httpClient
的实例化,包括连接池(禁用、定长、弹性)、连接超时(默认45s
)、是否走代理(代理服务器、用户名、密码;)、https
public HttpClient httpClient(final HttpClientProperties properties) {
// configure pool resources
// httpClient的初始化,包括连接池(禁用、定长、弹性)、连接超时(默认45s)、
// 是否走代理(代理服务器、用户名、密码;)、https
// 连接池
HttpClientProperties.Pool pool = properties.getPool();
ConnectionProvider connectionProvider;
// 省略。。。
HttpClient httpClient = HttpClient.create(connectionProvider)
.tcpConfiguration(tcpClient -> {
// 连接超时
if (properties.getConnectTimeout() != null) {
// 省略。。。
}
// configure proxy if proxy host is set.
// 是否走代理
HttpClientProperties.Proxy proxy = properties.getProxy();
// 省略。。。
});
// ssl
HttpClientProperties.Ssl ssl = properties.getSsl();
// 省略。。。
return httpClient;
}
-
WebClientConfiguration
则包含了WebClientPlugin
与WebClientResponsePlugin
;而NettyHttpClientConfiguration
则包含了NettyHttpClientPlugin
与NettyClientResponsePlugin
- 他们都是为了实现
http
后端服务的调用,以及返回结果的处理,是一个二选一的事情,由配置项soul.httpclient.strategy
决定,其中WebClientConfiguration
为默认 - 我们具体分析下这两种方式
WebClientConfiguration
WebClientPlugin
-
WebClientPlugin
实例化,需传入WebClient
@Bean
public SoulPlugin webClientPlugin(final ObjectProvider<HttpClient> httpClient) {
// 构造webClient,是一个非阻塞、用于执行http请求的响应式的客户端,其底层是Reactor Netty
// 类似restTemplate,我们使用时也需要进行构造
WebClient webClient = WebClient.builder()
// 客户端连接器,传入已预先配置好的httpClient,也就是我们在上面实例化的bean httpClient
.clientConnector(new ReactorClientHttpConnector(Objects.requireNonNull(httpClient.getIfAvailable())))
.build();
return new WebClientPlugin(webClient);
}
-
WebClientPlugin
处理逻辑,关键方法execute
与handleRequestBody
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 取出网关上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 请求路径
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
// 原始请求的方法类型
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
// 请求体的构造
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
// 处理请求体以及发送请求
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
return requestBodySpec
// 请求头
.headers(httpHeaders -> {
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
// 请求内容的类型
.contentType(buildMediaType(exchange))
// 请求body
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
// 执行请求
.exchange()
// 请求异常时的处理
.doOnError(e -> log.error(e.getMessage()))
// 超时异常的抛出
.timeout(Duration.ofMillis(timeout))
// 重试:只有当请求发生连接超时,重试次数,重试算法使用2的指数退让,第一次重试等待200ms,期间最大间隔为20s
.retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
// 将flux -> mono返回
.flatMap(e -> doNext(e, exchange, chain));
}
WebClientResponsePlugin
- 看下其处理逻辑
excute
方法
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 先执行插件链
return chain.execute(exchange)
// then表示插件链处理完成后才执行的逻辑,then里边的逻辑也是异步处理的
.then(Mono.defer(() -> {
ServerHttpResponse response = exchange.getResponse();
// 后端服务返回的响应
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
// 一些服务异常的处理
if (Objects.isNull(clientResponse)
|| response.getStatusCode() == HttpStatus.BAD_GATEWAY
|| response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 后端服务返回的状态码 cookie header的处理
response.setStatusCode(clientResponse.statusCode());
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
// 最后将响应体写入到客户端响应中
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}));
}
小结
-
WebClientPlugin
底层直接用spring
官方实现的webClient
去完成后端服务的请求,请求之前需要将客户端请求过来的请求头header
、请求方法httpMethod
、请求内容类型contentType
、请求体requestBody
一并发送给后端服务;执行请求后会将后端服务返回的响应放于exchange
中,交由插件WebClientResponsePlugin
处理;最后继续执行插件链 -
WebClientResponsePlugin
是先执行插件链,待插件链上的所有插件全部执行完毕后才执行自己的逻辑:处理客户端原始请求的响应,即将由后端服务返回的响应clientResponse
、状态码、cookie
、header
设置到其内容之中
NettyHttpClientConfiguration
NettyHttpClientPlugin
-
NettyHttpClientPlugin
实例化,传入httpClient
即可
@Bean
public SoulPlugin nettyHttpClientPlugin(final ObjectProvider<HttpClient> httpClient) {
return new NettyHttpClientPlugin(httpClient.getIfAvailable());
}
- 看下插件的
execute
方法,只保留关键逻辑
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 后端服务请求方法、请求头的生成
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
HttpHeaders filtered = request.getHeaders();
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
String url = exchange.getAttribute(Constants.HTTP_URL);
// 。。。
Flux<HttpClientResponse> responseFlux =
// 设置请求头
this.httpClient.headers(headers -> headers.add(httpHeaders))
// 构造requestSender
.request(method).uri(url)
// 发送请求 nettyOutBound出站的一个处理,即发送请求前的一个处理
// 需要将请求体的内容转换为netty的ByteBuf传输出去
.send((req, nettyOutbound) -> nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())))
// 将connection提取出Flux<HttpClientResponse>,
.responseConnection((res, connection) -> {
// 将后端服务返回的响应结果放到exchange中
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
// 将connection对象放到exchange中,需要传递给NettyClientResponsePlugin处理
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection);
// 处理header、cookie、httpStatus
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(Constants.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
} else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
} else {
throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());
}
response.getHeaders().putAll(headers);
return Mono.just(res);
});
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
Duration duration = Duration.ofMillis(timeout);
responseFlux = responseFlux
// 超时
.timeout(duration, Mono.error(new TimeoutException("Response took longer than timeout: " + duration)))
// 异常的映射
.onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
// 继续执行插件链
return responseFlux.then(chain.execute(exchange));
}
NettyClientResponsePlugin
- 直接看插件的
execute
方法
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
// 先执行插件自身的逻辑,为异步执行
return Mono.defer(() -> {
Connection connection = exchange.getAttribute(Constants.CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
// NettyDataBuffer的响应
final Flux<NettyDataBuffer> body = connection
// 对于client端的connection对象,你响应的处理反而是inbound;
// 必须先配置inbound作为connection的桥梁,否则是无法接收数据的
.inbound()
// 接收:转换数据
.receive()
// 将内存buffers保留进行复用
.retain()
// 将ByteBuf转换为spring中的NettyDataBuffer
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
// 是否为流媒体响应,若是直接返回
return isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body);
})
// 执行插件
.then(chain.execute(exchange)
//TODO question 成功的情况下怎么没有释放connection
.doOnError(throwable -> cleanup(exchange))).doOnCancel(() -> cleanup(exchange));
}
小结
-
NettyHttpClientPlugin
直接使用httpClient
做为发送请求的载体,先构造requestSender
(请求方法、请求路径、请求头),然后调用其send
方法获取到responseReceiver
,再通过responseConnection
提取出Flux<HttpClientResponse>
,其中对于响应中的(header
、cookie
、httpStatus
)这些就直接在这个提取逻辑中处理,而后端服务返回的响应webHandlerClientResponse
,还有整个connection
对象则放到exchange
中 -
NettyClientResponsePlugin
主要逻辑就是处理后端服务返回的响应。有些奇怪的是,这里并没有直接使用NettyHttpClientPlugin
中放到exchange
对象的webHandlerClientResponse
,而是从connection
对象中读取响应数据;随后再执行插件链逻辑
总结
- 上述基本将远程客户端的
http
请求转发到后端服务,以及将后端服务的响应再返回给远程客户端的代码实现进行了分析 -
soul
网关提供了webClient
与nettyClient
两套实现的机制,可供选择 - 其中
webClient
方式是直接使用spring
自己封装的WebClient
(是在HttpClient
之上封装实现的一套非阻塞,响应式的http
客户端)作为请求的载体,整个代码更简洁明了;而nettyClient
则是用更底层的HttpClient
去做http
请求操作,需自行处理netty
的出站outbound
与入站inbound
逻辑,整个代码会更复杂一些。