一、推荐阅读
可能有部分同学对 Hystrix 的特性了解的不是很清晰,推荐如下文章,写的真的好;
二、如何使用Hystrix
目前Hystrix最新版本是1.5.13,在项目的pom文件中加上依赖
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.13</version>
</dependency>
1.Command方式
要想使用hystrix,只需要继承HystrixCommand
或HystrixObservableCommand
,两者主要区别是:
- 前者的命令逻辑写在
run()
;后者的命令逻辑写在construct()
- 前者的
run()
是由新创建的线程执行;后者的construct()
是由调用程序线程执行 - 前者一个实例只能向调用程序发送(emit)单条数据,;后者一个实例可以顺序发送多条数据
1.1执行命令
execute()
、queue()
、observe()
、toObservable()
这4个方法用来触发执行run()/construct()
,一个实例只能执行一次这4个方法,特别说明的是HystrixObservableCommand
没有execute()
和queue()
。
4个方法的主要区别:
-
execute()
:以同步堵塞方式执行run()
。调用execute()
后,hystrix先创建一个新线程运行run()
,接着调用程序 要在execute()
调用处一直堵塞着,直到run()
运行完成 -
queue()
:以异步非堵塞方式执行run()
。一调用queue()
就直接返回一个Future对象,同时hystrix创建一个新线程运行run()
,调用程序通过Future.get()
拿到run()
的返回结果,而Future.get()
是堵塞执行的 -
observe()
:事件注册前执行run()/construct()
。第一步是事件注册前,先调用observe()
自动触发 执行run()/construct()
(如果继承的是HystrixCommand
,hystrix将创建新线程非堵塞执行run()
;如果继承的是HystrixObservableCommand
,将以调用程序线程堵塞执行construct()
),第二步是从observe()
返回后调用程序调用subscribe()
完成事件注册,如果run()/construct()
执行成功则触发onNext()
和onCompleted()
,如果执行异常则触发onError()
-
toObservable()
:事件注册后执行run()/construct()
。第一步是事件注册前,一调用toObservable()
就直接返回一个Observable<String>
对象,第二步调用subscribe()
完成事件注册后自动触发执行run()/construct()
(如果继承的是HystrixCommand
,hystrix将创建新线程非堵塞执行run()
,调用程序不必等待run()
;如果继承的是HystrixObservableCommand
,将以调用程序线程堵塞执行construct()
,调用程序等待construct()
执行完才能继续往下走),如果run()/construct()
执行成功则触发onNext()
和onCompleted()
,如果执行异常则触发onError()
1.2 HystrixCommand
public class CommandHelloWorld2 extends HystrixCommand<String> {
private final String name;
protected CommandHelloWorld2(String name) {
super(//命令分组用于对依赖操作分组,便于统计,汇总等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依赖超时时间,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工厂定义依赖名称
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工厂定义线程池名称
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,调用会超时
// TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
CommandHelloWorld2 commandHelloWorld2 = new CommandHelloWorld2("test-Fallback");
String s = commandHelloWorld2.execute();
System.out.println(" 同步 ====== " + s);
/*Future<String> queue = commandHelloWorld2.queue();
String s1 = queue.get();*/
}
}
-
execute() 执行
HystrixCommand
内部的execute方法,可以实现run方法的同步执行String s = commandHelloWorld2.execute();
-
queue() 执行
HystrixCommand
内部的queue方法,可以实现run方法的异步执行,如果依赖多个下游接口 ,通过异步方式,可以同时执行,提高接口性能。Future<String> queue = commandHelloWorld2.queue(); String s1 = queue.get()
-
通过执行的结果发现
run()
是由新创建的线程执行,结果如下同步 ====== Hello test-Fallback thread : hystrix-HelloWorldPool-1
构造方法
主要是相关参数设置,具体的参数的作用后续会介绍
protected CommandHelloWorld2(String name) {
super(//命令分组用于对依赖操作分组,便于统计,汇总等.
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
//配置依赖超时时间,500毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(500))
//HystrixCommondKey工厂定义依赖名称
.andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
//使用HystrixThreadPoolKey工厂定义线程池名称
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
以下是封装的具体点的参数设置,具体的参数设置根据业务需求而定;
public CommandHelloWorld4(Integer id) {
super(setter());
this.id = id;
}
private static Setter setter() {
return ApiSetter.setter("getNum");
}
public class ApiSetter {
public static HystrixCommand.Setter setter(String commandKeyName, String threadPoolKeyName) {
return setter("ApiGroup",commandKeyName,threadPoolKeyName);
}
public static HystrixCommand.Setter setter(String commandKeyName) {
return setter(commandKeyName,"Api-Pool");
}
/**
* @author liweihan
* @time 2017/12/20 16:57
* @description 相关参数设置
* @param groupKeyName 服务分组名
* @param commandKeyName 服务标识名称
* @param threadPoolKeyName 线程池名称
* @return
*/
public static HystrixCommand.Setter setter(String groupKeyName, String commandKeyName, String threadPoolKeyName) {
//服务分组
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupKeyName);
//服务标识
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandKeyName);
//线程池名称
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKeyName);
//线程配置
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
.withCoreSize(25)
.withKeepAliveTimeMinutes(5)
.withMaxQueueSize(Integer.MAX_VALUE)
.withQueueSizeRejectionThreshold(10000);
//命令属性的配置
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionTimeoutInMilliseconds(3000) //设置超时时间为3秒时自动熔断
.withCircuitBreakerErrorThresholdPercentage(20);//失败率达到20%自动熔断
//返回
return HystrixCommand.Setter
.withGroupKey(groupKey)
.andCommandKey(commandKey)
.andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties)
.andCommandPropertiesDefaults(commandProperties);
}
}
1.3 HystrixObservableCommand
代码如下:
public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {
private final String name;
public HelloWorldHystrixObservableCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
// @Override
// protected String getFallback() {
// System.out.println("触发了降级!");
// return "exeucute fallback";
// }
@Override
protected Observable<String> construct() {
System.out.println("in construct! thread:" + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
if (!observer.isUnsubscribed()) {
// observer.onError(getExecutionException()); // 直接抛异常退出,不会往下执行
observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
observer.onNext(name + " thread:" + Thread.currentThread().getName());
System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
observer.onCompleted(); // 不会往下执行observer的任何方法
System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
observer.onNext("abc"); // 不会执行到
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
public static void main(String[] args) throws Exception{
Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").observe();
// Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").toObservable();
Thread.sleep(2000);
System.out.println("睡眠中。。。。。");
// 注册观察者事件
// subscribe()是非堵塞的
hotObservable.subscribe(new Observer<String>() {
// 先执行onNext再执行onCompleted
@Override
public void onCompleted() {
System.out.println("hotObservable of ObservableCommand completed");
}
@Override
public void onError(Throwable e) {
System.out.println("hotObservable of ObservableCommand error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println("hotObservable of ObservableCommand onNext: " + v);
}
});
}
}
执行结果如下:
in construct! thread:main
in call of construct! thread:main
complete before------ thread:main
complete after------ thread:main
睡眠中。。。。。
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
hotObservable of ObservableCommand completed
- 一个实例通过observer.onNext可以顺序发送多条数据
- 通过执行结果可知调用
observe()
自动触发执行construct()
方法 - 通过执行结果 thread:main 可知
construct()
是由调用程序线程执行
toObservable()方法
将toObservable()的注释放开,observe()注释掉,执行main方法,结果如下:
睡眠中。。。。。
in construct! thread:main
in call of construct! thread:main
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
complete before------ thread:main
hotObservable of ObservableCommand completed
complete after------ thread:main
- 直接调用
toObservable()
并不会执行construct()
方法,调用subscribe()
完成事件注册后自动触发执行construct()
2.fallback(降级)
使用fallback机制很简单,继承HystrixCommand
只需重写getFallback()
,继承HystrixObservableCommand
只需重写resumeWithFallback()
,比如HelloWorldHystrixCommand
加上下面代码片段:
@Override
protected String getFallback() {
return "execute Falled";
}
fallback实际流程是当run()/construct()
被触发执行时或执行中发生错误时,将转向执行getFallback()/resumeWithFallback()
。结合下图,4种错误情况将触发fallback:
- 当
construct()
或者run()
方法执行过程中抛出异常。 - 当回路器打开,命令的执行进入了熔断状态。
- 当命令执行的线程池和队列或者信号量已经满容。
- 命令执行超时。
若失败回退方法执行失败,或者用户未提供失败回退方法,Hystrix 会根据调用执行命令的方法的不同而产生不同的行为:
-
execute()
—— 抛出异常 -
queue()
—— 成功返回Future
对象,但其get()
方法被调用时,会抛出异常 -
observe()
—— 返回Observable
对象,当你订阅它的时候,会立即调用 subscriber 的onError
方法中止请求 -
toObservable()
—— 返回Observable
对象,当你订阅它的时候,会立即调用 subscriber 的onError
方法中止请求
途中大致的执行顺序如下:
1、构建 HystrixCommand 或者 HystrixObservableCommand 对象
2、执行命令(即上述 Command 对象包装的逻辑)
3、结果是否有缓存
4、请求线路(类似电路)是否是开路
5、线程池/请求队列/信号量占满时会发生什么
6、使用 HystrixObservableCommand.construct() 还是 HystrixCommand.run()
7、计算链路健康度
8、失败回退逻辑
9、返回正?;赜?/p>
接下来,我们一一验证4中情况
1、当construct()
或者run()
方法执行过程中抛出异常,代码如下:
public class HystrixFallbackException extends HystrixCommand<String> {
private final String name;
public HystrixFallbackException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("FallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------会触发fallback的case-------------------*/
//1.主动抛出异常
// throw new HystrixTimeoutException();
// throw new RuntimeException("this command will trigger fallback");
// throw new Exception("this command will trigger fallback");
// throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, commandClass, message, cause, fallbackException);
// 2.除零异常
//int i = 1/0;
/*---------------不会触发fallback的case-------------------*/
// 3.被捕获的异常不会触发fallback
/*try {
throw new RuntimeException("this command never trigger fallback");
} catch(Exception e) {
e.printStackTrace();
}*/
// 4.HystrixBadRequestException异常由非法参数或非系统错误引起,不会触发fallback,也不会被计入熔断器
throw new HystrixBadRequestException("HystrixBadRequestException is never trigger fallback");
//return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
HystrixFallbackException hlx = new HystrixFallbackException("Hlx");
try {
String execute = hlx.execute();
System.out.println(execute);
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,会被捕获到这里" + e.getCause());
}
}
}
几种异常情况已经在代码中注释了,可直接去尝试每种情况
非HystrixBadRequestException异常:当抛出HystrixBadRequestException时,调用程序可以捕获异常,没有触发
getFallback()
,而其他异常则会触发getFallback()
,调用程序将获得getFallback()
的返回
2、命令执行超时
代码如下:
public class HystrixFallbackTimeOut extends HystrixCommand<String> {
private final String name;
protected HystrixFallbackTimeOut(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String getFallback() {
return "execute Falled";
}
@Override
protected String run() throws Exception {
//sleep 2秒 ,调用会超时
TimeUnit.MILLISECONDS.sleep(2000);
return "Hello " + name + " thread : " + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception {
HystrixFallbackTimeOut hystrixFallbackTimeOut = new HystrixFallbackTimeOut("Fallback-timeout");
String s = hystrixFallbackTimeOut.execute();
System.out.println(" 同步 ====== " + s);
}
}
- run()方法由于睡眠了2s,Hystrix的默认的执行超时时间为1000ms,所以执行会超时
3、当回路器打开,命令的执行进入了熔断状态
代码如下:
/**
*
* CircuitBreakerRequestVolumeThreshold设置为3,意味着10s内请求超过3次才会触发熔断器
* circuitBreakerErrorThresholdPercentage设置为80,错误率是为%80才会触发熔断器
* 必须两个参数同时满足才会才会触发熔断器
* run()中使命令超时进入fallback,执行4次run后,将被熔断,进入降级,即不进入run()而直接进入fallback
*
*/
public class HystrixFallbackCircuitBreaker extends HystrixCommand<String> {
private Integer id;
public HystrixFallbackCircuitBreaker(Integer id) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 配置线程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 配置线程池里的线程数,设置足够多线程,以防未熔断却打满threadpool
)
.andCommandPropertiesDefaults( // 配置熔断器
HystrixCommandProperties.Setter()
//开启熔断器
.withCircuitBreakerEnabled(true)
//滑动窗口内(10s)的请求数阈值,只有达到了这个阈值,才有可能熔断。默认是 20,如果这个时间段只有19个请求,就算全部失败了,也不会自动熔断。
.withCircuitBreakerRequestVolumeThreshold(3)
//错误率阈值,默认 50%,比如(10s)内有100个请求,其中有60个发生异常,那么这段时间的错误率是 60,已经超过了错误率阈值,熔断器会自动打开。
.withCircuitBreakerErrorThresholdPercentage(80)
// .withCircuitBreakerForceOpen(true) // 置为true时,所有请求都将被拒绝,直接到fallback
// .withCircuitBreakerForceClosed(true) // 置为true时,将忽略错误
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 信号量隔离
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println("running run():" + id);
if (id % 2 == 0 && id <= 10) { //让小于等于10的偶数返回
return "running run():" + id;
} else { //让奇数或大于10的数进入fallback
TimeUnit.MILLISECONDS.sleep(2000);
return id+"";
}
}
@Override
protected String getFallback() {
return " ====== CircuitBreaker fallback" + id + " ,是否进入熔断:" + super.isCircuitBreakerOpen();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i++) {
try {
System.out.println("===========" + new HystrixFallbackCircuitBreaker(i).execute());
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
}
- 我们配置10s内请求数大于3个时就启动熔断器,请求错误率大于80%时就熔断,然后for循环发起请求,当请求符合熔断条件时将触发
getFallback()
。
4、当命令执行的线程池和队列或者信号量已经满容
代码如下:
package com.jimingqiang.study.hystrix.hystrixbegin.failback;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Created by QDHL on 2018/10/11.
*
* @author mingqiang ji
*/
public class HystrixFallbackThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixFallbackThreadPool(String name) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置线程池里的线程数
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixFallbackThreadPool("Hlx"+i).queue();
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixFallbackThreadPool("Hlx").execute());
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
}
}
-
我们配置线程池数目为3,然后先用一个for循环执行
queue()
,触发的run()
sleep 3s,然后再用第2个for循环执行execute()
,发现所有execute()
都触发了fallback,这是因为第1个for的线程还在sleep,占用着线程池所有线程,导致第2个for的所有命令都无法获取到线程。3.隔离策略
hystrix提供了两种隔离策略:线程池隔离和信号量隔离。hystrix默认采用线程池隔离。
- 线程池隔离:不同服务通过使用不同线程池,彼此间将不受影响,达到隔离效果。我们通过andThreadPoolKey配置使用命名为
ThreadPoolTest
的线程池,实现与其他命名的线程池天然隔离,如果不配置andThreadPoolKey则使用withGroupKey配置来命名线程池 - 信号量隔离:线程隔离会带来线程开销,有些场景(比如无网络请求场景)可能会因为用开销换隔离得不偿失,为此hystrix提供了信号量隔离,当服务的并发数大于信号量阈值时将进入fallback。通过
withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
配置为信号量隔离,通过withExecutionIsolationSemaphoreMaxConcurrentRequests
配置执行并发数。
3.1 线程池隔离
在该模式下,用户请求会被提交到各自的线程池中执行,把执行每个下游服务的线程分离,这样,某个依赖服务的高延迟只会拖慢这个依赖服务对应的线程池,从而达到资源隔离的作用。当线程池来不及处理并且请求队列塞满时,新进来的请求将快速失败,可以避免依赖问题扩散。
优势
- 减少所依赖服务发生故障时的影响面,比如ServiceA服务发生异常,导致请求大量超时,对应的线程池被打满,这时并不影响ServiceB、ServiceC的调用。
- 如果接口性能有变动,可以方便的动态调整线程池的参数或者是超时时间,前提是Hystrix参数实现了动态调整。
缺点
- 请求在线程池中执行,肯定会带来任务调度、排队和上下文切换带来的开销。
- 因为涉及到跨线程,那么就存在ThreadLocal数据的传递问题,比如在主线程初始化的ThreadLocal变量,在线程池线程中无法获取
注意
因为Hystrix默认使用了线程池模式,所以对于每个Command,在初始化的时候,会创建一个对应的线程池,如果项目中需要进行降级的接口非常多,比如有上百个的话,不太了解Hystrix内部机制的同学,按照默认配置直接使用,可能就会造成线程资源的大量浪费。
- 线程池隔离:不同服务通过使用不同线程池,彼此间将不受影响,达到隔离效果。我们通过andThreadPoolKey配置使用命名为
线程池的使用示意图如下图所示,当n个请求线程并发对某个接口请求调用时,会先从hystrix管理的线程池里面获得一个线程,然后将参数传递给这个线程去执行真正调用。线程池的大小有限,默认是10个线程,可以使用maxConcurrentRequests参数配置,如果并发请求数多于线程池线程个数,就有线程需要进入队列排队,但排队队列也有上限,默认是 5,如果排队队列也满,则必定有请求线程会走fallback流程。
线程池模式可以支持异步调用,支持超时调用,支持直接熔断,存在线程切换,开销大。
代码如下:
public class HystrixThreadPoolLsolation extends HystrixCommand<String> {
private final String name;
public HystrixThreadPoolLsolation(String name,String threadPoolName) {
// super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolName))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置线程池里的线程数
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
TimeUnit.MILLISECONDS.sleep(3000);
return name+"-"+Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
try {
Future<String> future = new HystrixThreadPoolLsolation("Hlx"+i,"thread-pool-1").queue();
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
for(int i = 0; i < 20; i++) {
try {
System.out.println("===========" + new HystrixThreadPoolLsolation("Hlx","thread-pool-2").execute());
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
}
}
- 我们配置线程池数目为3,然后先用一个for循环执行
queue()
,触发的run()
sleep 3s,然后再用第2个for循环执行execute()
,发现所有execute()
没有触发fallback,而是继续执行,这是因为两个命令配置了不同的线程池。
3.2 信号量
如果要使用信号量模式,需要配置参数execution.isolation.strategy=ExecutionIsolationStrategy.SEMAPHORE
;
另外,为了限制对下游依赖的并发调用量,可以配置Hystrix的
execution.isolation.semaphore.maxConcurrentRequests
当并发请求数达到阈值时,请求线程可以快速失败,执行降级。
信号量的使用示意图如下图所示,当n个并发请求去调用一个目标服务接口时,都要获取一个信号量才能真正去调用目标服务接口,但信号量有限,默认是10个,可以使用maxConcurrentRequests参数配置,如果并发请求数多于信号量个数,信号量在达到上限时,会拒绝后续请求的访问,则必定有请求线程会走fallback流程,从而达到限流和防止雪崩的目的。
信号量模式从始至终都只有请求线程自身,是同步调用模式,不支持超时调用,由于没有线程的切换,开销非常小。
在该模式下,接收请求和执行下游依赖在同一个线程内完成,
比如一个接口中依赖了3个下游:serviceA、serviceB、serviceC,且这3个服务返回的数据互相不依赖,这种情况下如果针对A、B、C的熔断降级使用信号量模式,那么接口耗时就等于请求A、B、C服务耗时的总和,无疑这不是好的方案。
代码如下:
package com.jimingqiang.study.hystrix.hystrixbegin.Isolationstrategy;
import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 测试信号量隔离
* 默认执行run()用的是主线程,为了模拟并行执行command,这里我们自己创建多个线程来执行command
* 设置ExecutionIsolationSemaphoreMaxConcurrentRequests为3,意味着信号量最多允许执行run的并发数为3,超过则触发降级,即不执行run而执行getFallback
* 设置FallbackIsolationSemaphoreMaxConcurrentRequests为1,意味着信号量最多允许执行fallback的并发数为1,超过则抛异常fallback execution rejected
*/
public class HystrixSemaphorelLsolation extends HystrixCommand<String> {
private final String name;
public HystrixSemaphorelLsolation(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreTestKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreTestThreadPoolKey"))
.andCommandPropertiesDefaults( // 配置信号量隔离
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) // 信号量隔离
.withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
)
// 设置了信号量隔离后,线程池配置将变无效
// .andThreadPoolPropertiesDefaults(
// HystrixThreadPoolProperties.Setter()
// .withCoreSize(13) // 配置线程池里的线程数
// )
);
this.name = name;
}
@Override
protected String run() throws Exception {
return "run(): name="+name+",线程名是" + Thread.currentThread().getName();
}
@Override
protected String getFallback() {
return "getFallback(): name="+name+",线程名是" + Thread.currentThread().getName();
}
public static void main(String[] args) throws Exception{
try {
Thread.sleep(2000);
for(int i = 0; i < 5; i++) {
final int j = i;
// 自主创建线程来执行command,创造并发场景
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("===========" + new HystrixSemaphorelLsolation("HLX" + j).execute());
}
});
thread.start();
}
} catch(Exception e) {
e.printStackTrace();
}
System.in.read();
}
}
-
由于并发执行5个线程,ExecutionIsolationSemaphoreMaxConcurrentRequests为3,设置FallbackIsolationSemaphoreMaxConcurrentRequests为1,最终执行结果是3个线程可以并发执行
run()
,1个线程执行
getFallback()
,一个线程抛出异常;
4.熔断器
在生活中,如果电路的负载过高,保险箱会自动跳闸,以?;ぜ依锏母髦值缙?,这就是熔断器的一个活生生例子。在Hystrix中也存在这样一个熔断器,当所依赖的服务不稳定时,能够自动熔断,并提供有损服务,?;し竦奈榷ㄐ?。在运行过程中,Hystrix会根据接口的执行状态(成功、失败、超时和拒绝),收集并统计这些数据,根据这些信息来实时决策是否进行熔断。
线路的开路闭路详细逻辑如下:
- 假设线路内的容量(请求QPS)达到一定阈值(通过
HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
配置) - 同时,假设线路内的错误率达到一定阈值(通过
HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
配置) - 熔断器将从『闭路』转换成『开路』
- 若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
- 经过一定时间(即『休眠窗口』,通过
HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()
配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。
? 代码可以参考 【2.fallback(降级)中的 3、当回路器打开,命令的执行进入了熔断状态的代码片段 】
5.请求缓存
5.1 请求缓存有如下好处:
-
不同请求路径上针对同一个依赖服务进行的重复请求(有同一个缓存 Key),不会真实请求多次
这个特性在企业级系统中非常有用,在这些系统中,开发者往往开发的只是系统功能的一部分。(注:这样,开发者彼此隔离,不太可能使用同样的方法或者策略去请求同一个依赖服务提供的资源)
例如,请求一个用户的
Account
的逻辑如下所示,这个逻辑往往在系统不同地方被用到:Account account = new UserGetAccount(accountId).execute(); //or Observable<Account> accountObservable = new UserGetAccount(accountId).observe();
Hystrix 的
RequestCache
只会在内部执行run()
方法一次,上面两个线程在执行HystrixCommand
命令时,会得到相同的结果,即使这两个命令是两个不同的实例。 -
数据获取具有一致性
因为缓存的存在,除了第一次请求需要真正访问依赖服务以外,后续请求全部从缓存中获取,可以保证在同一个用户请求内,不会出现依赖服务返回不同的回应的情况。
-
避免不必要的线程执行
在
construct()
或run()
方法执行之前,会先从请求缓存中获取数据,因此,Hystrix 能利用这个特性避免不必要的线程执行,减小系统开销。若 Hystrix 没有实现请求缓存,那么
HystrixCommand
和HystrixObservableCommand
的实现者需要自己在construct()
或run()
方法中实现缓存,这种方式无法避免不必要的线程执行开销。
5.2 缓存的使用
要使用hystrix cache功能,第一个要求是重写getCacheKey()
,用来构造cache key;第二个要求是构建context,如果请求B要用到请求A的结果缓存,A和B必须同处一个context。通过HystrixRequestContext.initializeContext()
和context.shutdown()
可以构建一个context,这两条语句间的所有请求都处于同一个context。
代码如下:
public class CommandHelloWorld3 extends HystrixCommand<String> {
private final int id;
protected CommandHelloWorld3(int id) {
super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
this.id = id;
}
@Override
protected String run() throws Exception {
System.out.println(Thread.currentThread().getName() + " execute id = " + id);
return "execute=" + id;
}
//重写getCacheKey,实现区分不同请求的逻辑
@Override
protected String getCacheKey() {
System.out.println(" --- ");
return String.valueOf(id);
}
public static void main(String[] args) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_a = new CommandHelloWorld3(22);
CommandHelloWorld3 commandHelloWorld3_b = new CommandHelloWorld3(22);
System.out.println("a执行结果:" + commandHelloWorld3_a.execute());
System.out.println("a执行结果是否从缓存中获?。? + commandHelloWorld3_a.isResponseFromCache);
System.out.println("b执行结果:" + commandHelloWorld3_b.execute());
System.out.println("b执行结果是否从缓存中获?。? + commandHelloWorld3_b.isResponseFromCache);
} finally {
context.shutdown();
}
context = HystrixRequestContext.initializeContext();
try {
CommandHelloWorld3 commandHelloWorld3_c = new CommandHelloWorld3(22);
System.out.println("c执行结果:" + commandHelloWorld3_c.execute());
System.out.println("c执行结果是否从缓存中获取:" + commandHelloWorld3_c.isResponseFromCache);
} finally {
context.shutdown();
}
}
}
执行结果如下:
---
---
hystrix-RequestCacheCommand-1 execute id = 22
a执行结果:execute=22
a执行结果是否从缓存中获?。篺alse
---
---
b执行结果:execute=22
b执行结果是否从缓存中获取:true
---
---
hystrix-RequestCacheCommand-2 execute id = 22
c执行结果:execute=22
c执行结果是否从缓存中获?。篺alse
6.合并请求collapsing
下图展示了在两种场景(未增加『请求合并器』和增加『请求合并器』)下,线程和网络连接数量(假设所有请求在一个很小的时间窗口内,例如 10ms,是『并发』的):
为什么要使用请求合并?
例如,这里有一个包含 300 个视频对象的列表,需要遍历这个列表,并对每一个对象调用 getSomeAttribute()
方法,但如果简单处理的话,可能会导致 300 次的网络请求(假设 getSomeAttribute()
方法内需要发出网络请求),每一个网络请求可能都会花上几毫秒(显然,这种方式非常容易拖慢系统)。
通过使用 HystrixCollapser
,Hystrix 能自动完成请求的合并,可以将300个网络请求降低为只发送一次网络请求,大大的减少了网络的耗时。
Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),可以有效节省网络带宽和线程池资源
请求合并带来的额外开销
请求合并会导致依赖服务的请求延迟增高(该延迟为等待请求的延迟),延迟的最大值为合并时间窗口大小。
若某个请求耗时的可能是 5ms,合并时间窗口为 10ms,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了。
请求合并带来的额外开销是否值得,取决于将要执行的命令,高延迟的命令相比较而言不会有太大的影响,因为这个时候时间窗的时间消耗就显得微不足道了
另外,如果一个命令具有高并发度,并且能批量处理多个,甚至上百个的话,请求合并带来的性能开销会因为吞吐量的极大提升而基本可以忽略,因为 Hystrix 会减少这些请求所需的线程和网络连接数量。如果一个合并时间窗口内只有 1~2 个请求,将请求合并显然不是明智的选择。
代码如下:
package com.jimingqiang.study.hystrix.hystrixbegin.collapsing;
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class HystrixCollapsing extends HystrixCollapser<List<String>, String, Integer> {
private final Integer key;
public HystrixCollapsing(Integer key) {
this.key = key;
}
/**
* 请求参数
* 如果有多个参数需要被绑定,创建一个单独的对象来包含它们,或者使用Tuple。
* @return
*/
@Override
public Integer getRequestArgument() {
return key;
}
/**
* 创建一个批量请求命令
* @param requests 保存了延迟时间窗中收集到的所有单个的请求的参数 String是返回结果类型,Integer是请求参数类型
* @return
*/
@Override
protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
return new BatchCommand(requests); // 把批量请求传给command类
}
//
/**
* 把批量请求的结果和对应的请求一一对应起来
* @param batchResponse 响应的结果
* @param requests 请求参数
*/
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;
for (CollapsedRequest<String, Integer> request : requests) {
request.setResponse(batchResponse.get(count++));
}
}
// command类
private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<CollapsedRequest<String, Integer>> requests;
private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollepsingGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("CollepsingKey")
));
this.requests = requests;
}
@Override
protected List<String> run() {
ArrayList<String> response = new ArrayList<String>();
// 处理每个请求,返回结果
for (CollapsedRequest<String, Integer> request : requests) {
// artificial response for each argument received in the batch
response.add("ValueForKey: " + request.getArgument() + " thread:" + Thread.currentThread().getName());
}
return response;
}
}
public static void main(String[] args) throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<String> f1 = new HystrixCollapsing(1).queue();
Future<String> f2 = new HystrixCollapsing(2).queue();
Future<String> f3 = new HystrixCollapsing(3).queue();
Future<String> f4 = new HystrixCollapsing(4).queue();
Future<String> f5 = new HystrixCollapsing(5).queue();
// f5和f6,如果sleep时间够小则会合并,如果sleep时间够大则不会合并,默认10ms
TimeUnit.MILLISECONDS.sleep(1000);
Future<String> f6 = new HystrixCollapsing(6).queue();
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
System.out.println(f5.get());
System.out.println(f6.get());
// note:numExecuted表示共有几个命令执行,1个批量多命令请求算一个,这个实际值可能比代码写的要多,因为due to non-determinism of scheduler since this example uses the real timer
int numExecuted = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size();
System.out.println("num executed: " + numExecuted);
int numLogs = 0;
for (HystrixInvokableInfo<?> command : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
numLogs++;
System.err.println(command.getCommandKey().name() + " => command.getExecutionEvents(): " + command.getExecutionEvents());
}
System.out.println(numLogs==numExecuted);
} finally {
context.shutdown();
}
}
}
重要一点,两个请求能自动合并的前提是两者足够“近”,即合并时间窗口,两者启动执行的间隔时长要足够小,默认为10ms,即超过10ms将不自动合并。
-
我们连续发起多个queue请求,依次返回f1~f6共6个Future对象,根据打印结果可知f2~f5同处一个线程,说明这4个请求被合并了,而f6由另一个线程执行,这是因为f5和f6中间隔了一个sleep,超过了合并要求的最大间隔时长,f1为什么没有合并到一起,我很疑惑
执行结果如下:
ValueForKey: 1 thread:hystrix-CollepsingGroup-1 ValueForKey: 2 thread:hystrix-CollepsingGroup-2 ValueForKey: 3 thread:hystrix-CollepsingGroup-2 ValueForKey: 4 thread:hystrix-CollepsingGroup-2 ValueForKey: 5 thread:hystrix-CollepsingGroup-2 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] ValueForKey: 6 thread:hystrix-CollepsingGroup-3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] num executed: 3 CollepsingKey => command.getExecutionEvents(): [SUCCESS, COLLAPSED] true
7. 配置策略
? 具体的配置可以查看官网 官网配置地址,下边仅是简单的整理
-
HystrixCommandProperties
/* --------------统计相关------------------*/ // 统计滚动的时间窗口,默认:5000毫秒(取自circuitBreakerSleepWindowInMilliseconds) private final HystrixProperty metricsRollingStatisticalWindowInMilliseconds; // 统计窗口的Buckets的数量,默认:10个,每秒一个Buckets统计 private final HystrixProperty metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow // 是否开启监控统计功能,默认:true private final HystrixProperty metricsRollingPercentileEnabled; /* --------------熔断器相关------------------*/ // 熔断器在整个统计时间内是否开启的阀值,默认20。也就是在metricsRollingStatisticalWindowInMilliseconds(默认10s)内至少请求20次,熔断器才发挥起作用 private final HystrixProperty circuitBreakerRequestVolumeThreshold; // 熔断时间窗口,默认:5秒.熔断器中断请求5秒后会进入半打开状态,放下一个请求进来重试,如果该请求成功就关闭熔断器,否则继续等待一个熔断时间窗口 private final HystrixProperty circuitBreakerSleepWindowInMilliseconds; //是否启用熔断器,默认true. 启动 private final HystrixProperty circuitBreakerEnabled; //默认:50%。当出错率超过50%后熔断器启动 private final HystrixProperty circuitBreakerErrorThresholdPercentage; //是否强制开启熔断器阻断所有请求,默认:false,不开启。置为true时,所有请求都将被拒绝,直接到fallback private final HystrixProperty circuitBreakerForceOpen; //是否允许熔断器忽略错误,默认false, 不开启 private final HystrixProperty circuitBreakerForceClosed; /* --------------信号量相关------------------*/ //使用信号量隔离时,命令调用最大的并发数,默认:10 private final HystrixProperty executionIsolationSemaphoreMaxConcurrentRequests; //使用信号量隔离时,命令fallback(降级)调用最大的并发数,默认:10 private final HystrixProperty fallbackIsolationSemaphoreMaxConcurrentRequests; /* --------------其他------------------*/ //使用命令调用隔离方式,默认:采用线程隔离,ExecutionIsolationStrategy.THREAD private final HystrixProperty executionIsolationStrategy; //使用线程隔离时,调用超时时间,默认:1秒 private final HystrixProperty executionIsolationThreadTimeoutInMilliseconds; //线程池的key,用于决定命令在哪个线程池执行 private final HystrixProperty executionIsolationThreadPoolKeyOverride; //是否开启fallback降级策略 默认:true private final HystrixProperty fallbackEnabled; // 使用线程隔离时,是否对命令执行超时的线程调用中断(Thread.interrupt())操作.默认:true private final HystrixProperty executionIsolationThreadInterruptOnTimeout; // 是否开启请求日志,默认:true private final HystrixProperty requestLogEnabled; //是否开启请求缓存,默认:true private final HystrixProperty requestCacheEnabled; // Whether request caching is enabled.
-
HystrixCollapserProperties
//请求合并是允许的最大请求数,默认: Integer.MAX_VALUE private final HystrixProperty maxRequestsInBatch; //批处理过程中每个命令延迟的时间,默认:10毫秒 private final HystrixProperty timerDelayInMilliseconds; //批处理过程中是否开启请求缓存,默认:开启 private final HystrixProperty requestCacheEnabled;
-
HystrixThreadPoolProperties
/* 配置线程池大小,默认值10个. 建议值:请求高峰时99.5%的平均响应时间 + 向上预留一些即可 */ private final HystrixProperty corePoolSize; /* 配置线程值等待队列长度,默认值:-1 建议值:-1表示不等待直接拒绝,测试表明线程池使用直接决绝策略+ 合适大小的非回缩线程池效率最高.所以不建议修改此值。 当使用非回缩线程池时,queueSizeRejectionThreshold,keepAliveTimeMinutes 参数无效 */ private final HystrixProperty maxQueueSize;
参考: