前面的章节,我们学习了微服务中对熔断降级的原理,参考这篇《服务治理:熔断、降级、限流》。了解了固定窗口算法、滑动窗口算法、 漏桶原理和令牌桶原理,本文对Hystrix做进一步的分析。
Hystrix是Netflix开源的一款具备熔断、限流、降级能力的容错系统,设计目的是将应用中的系统访问、多链路服务调用、第三方依赖服务的调用,通过流量资源控制的方式隔离开。
避免了在分布式系中的某个服务故障沿着调用链向上传递,出现整体的服务雪崩,并以此提升系统的稳定性和健壮性。
以往的访问模式,是A链路 与 B链路(A -> B)的直接访问。而命令模式(Command Pattern)的作用则是通过建立命令对象来解耦A、B链路。
在执行过程中,命令对象可以对请求进行排队、记录请求日志、执行故障注入、超时/故障 快速返回等操作,如 A -> Command Work -> B。
在计算机中,线程是系统运行的基本单位,我们可以通过对线程池资源的管理,如异步请求,请求超时断开,请求熔断,来对系统资源进行隔离,当部分类型的资源有限,请求过载时,进行系统保护。
Java程序中,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。也保证了资源竞争的隔离性。
如下图所示(图片源自官网),Hystrix的工作流程上大概会有如下9个步骤,下文将详细介绍每个流程:
创建HystrixCommand 或者 HystrixObservableCommand 命令
执行命令,如图中的,一共有四种方式来执行run()/construct()
单个实例只能执行一次这4个方法。HystrixObservableCommand没有execute()和queue()。
执行方式 | 说明 | 可用对象 |
---|---|---|
execute() | 阻塞式同步执行,返回依赖服务的单一返回结果(或者抛出异常) | HystrixCommand |
queue() | 基于Future的异步方式执行,返回依赖服务的单一返回结果(或者抛出异常) | HystrixCommand |
observe() | 基于Rxjava的Observable方式,返回通过Observable表示的依赖服务返回结果,代调用代码先执行(Hot Obserable) | HystrixObservableCommand |
toObvsevable() | 基于Rxjava的Observable方式,返回通过Observable表示的依赖服务返回结果,执行代码等到真正订阅的时候才会执行(cold observable) | HystrixObservableCommand |
如果当前命令对象配置了允许从结果缓存中取返回结果,并且在结果缓存中已经缓存了请求结果,则立即通过Observable返回。
判断 circuit-breaker 是否打开。如果3.3步骤没有缓存没有命中,则判断一下当前断路器的断路状态是否打开。如果断路器状态为打开状态,则Hystrix将不会执行此Command命令,直接执行步骤3.8 调用Fallback。
如果断路器状态是关闭,则执行 步骤3.5 检查是否有足够的资源运行 Command命令。
如果当前要执行的Command命令 先关连的线程池 和队列(或者信号量)资源已经满了,Hystrix将不会运行 Command命令,直接执行步骤8的Fallback降级处理;如果未满,表示有剩余的资源执行Command命令,则执行步骤 3.6。
执行 HystrixObservableCommand.construct() 或者 HystrixCommand.run()。
当经过步骤 3.5 判断,有足够的资源执行Command命令时,本步骤将调用Command命令运行方法。调用HystrixCommand的run方法。按照一下两个条件去判断:
对于熔断器的信息会做健康的判断。Hystrix 统计Command命令执行执行过程中的 success count、fail count、reject count 和 timeout count, 并将这些信息记录到断路器(Circuit Breaker)中。
断路器会把上面的统计信息按照时间窗统计下来。并判断什么时候可以将请求熔断,在熔断后和熔断窗口期结束之前,请求都不会被Fallback。熔断窗口期结束后会再次校验,通过后熔断开关会被关闭。
Hystrix会在以下场景出现后,触发Fallback操作:
Hystrix命令对象执行成功,会直接返回结果或者以Observable形式返回结果。返回的Observable 会执行以下流程返回结果。
Hystrix获取返回结果执行流程如下(图片源自官网):
pom.xml加上以下依赖。我们使用原生hystrix来做案例介绍。
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.4.10</version>
</dependency>
fallBack是指当程序符合我们执行熔断降级的条件时候,我们默认执行的路线,可以是一个方法或者一个对象。HystrixCommand中已有,我们只需重写即可,类似
@Override
protected String getFallback() {
return "当熔断、降级发生时,返回的默认信息";
}
在3.8 节 我们介绍了Hystrix 触发 fallBack的四种条件,下面我们一个个来测试。
除了HystrixBadRequestException,所有程序抛出的异常,都会触发getFallback(),调用程序将获得getFallback()的执行并返回。
/**
* @author brand
* @Description: 模拟异常/超时的场景
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixException extends HystrixCommand<String> {
/**
* 实现getFallback()后,执行命令时遇到以上4种情况将被fallback接管,不会抛出异常或其他
* 下面演示的是异常的情况
*/
private final String name;
public HystrixException(String name) {
super(HystrixCommandGroupKey.Factory.asKey("Command Group:fallbackGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------以下三种情况触发fallback-------------------*/
// 1.循环+等待,超时fallBack
// int i = 0;
// while (true) {
// i++;
// Thread.currentThread().sleep(1000);
// }
// 2.除零导致异常
// int i = 1/0;
// 3.主动抛出异常
// throw new Exception("command trigger fallback");
/*---------------直接抛出HystrixBadRequestException,不触发fallback-----------------*/
// HystrixBadRequestException,这个是非法参数或非系统错误引起,不触发fallback,也不被计入熔断器
throw new HystrixBadRequestException("HystrixBadRequestException not trigger fallback");
// return "success";
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
编写测试类:
/**
* @author brand
* @Description: 测试异常/超时 fallBack
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:35
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class ExceptionTimeOutFallBackTest {
@Test
public void testException() throws IOException {
try {
assertEquals("success", new HystrixException("Exception").execute());
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,会被捕获到这里" + e.getCause());
}
}
}
测试类执行直接抛出HystrixBadRequestException,测试类会走到catch函数段中。
测试类执行其他三种情况,会得到以下结果:
同上 4.2.1 中的 循环+等待,超时fallBack 的场景
图片源自官网,这边就不单独画了。
key值 | 说明 | 默认值 |
---|---|---|
circuitBreaker.enabled | 是否开启断路器 | true |
circuitBreaker.requestVolumeThreshold | 断路器启用请求数阈值 | 10 |
circuitBreaker.sleepWindowInMilliseconds | 断路器启用后的睡眠时间窗 | 5000(ms) |
circuitBreaker.errorThresholdPercentage | 断路器启用失败率阈值 | 50(%) |
circuitBreaker.forceOpen | 是否强制将断路器设置成开启状态 | false |
circuitBreaker.forceClosed | 是否强制将断路器设置成关闭状态 | false |
/**
* @author brand
* @Description: 熔断
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午3:41
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixCircuitBreaker extends HystrixCommand<String> {
private final String name;
public HystrixCircuitBreaker(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group:CircuitBreaker"))
.andCommandKey(HystrixCommandKey.Factory.asKey("Command:CircuitBreaker"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPool:CircuitBreakerTest"))
.andThreadPoolPropertiesDefaults( // 配置线程池
HystrixThreadPoolProperties.Setter()
.withCoreSize(200) // 配置线程池里的线程数,设置足够多线程,以防未熔断却打满threadpool
)
.andCommandPropertiesDefaults( // 配置熔断器
HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(10)
.withCircuitBreakerErrorThresholdPercentage(50)
// .withCircuitBreakerForceOpen(true) // true时强制将断路器设置成开启状态,所有请求都将被拒绝,直接到fallback
// .withCircuitBreakerForceClosed(true) // true时强制将断路器设置成关闭状态,将忽略所有错误
// .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) // 信号量隔离
// .withExecutionTimeoutInMilliseconds(5000)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
System.out.println("running num :" + name);
int num = Integer.valueOf(name);
if (num % 2 == 0 && num < 30) { // 符合条件,直接返回
return name;
} else { // 模拟异常
int j = 0;
j = num / j;
}
return name;
}
@Override
protected String getFallback() {
return "CircuitBreaker fallback: " + name;
}
}
执行结果如下,偶数正常返回,奇数进入熔断信息,并且超过30之后全部进入fallBack
线程池隔离:不同服务通过使用不同线程池,彼此间将不受影响,达到隔离效果。
我们通过andThreadPoolKey配置使用命名为ThreadPoolTest的线程池,实现与其他命名的线程池天然隔离,如果不配置andThreadPoolKey,也可以则使用withGroupKey配置来命名线程池。
/**
* @author brand
* @Description: 线程池隔离
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分组
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest")) // 线程池key
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置线程池里的线程数为3。超过3次进行熔断
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果线程数超配,会触发fallback的case,否则休眠1s,进行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
测试一下,下面都是使用 HystrixThreadPoolKey 为 ThreadPoolTest的线程池命名,所以是公用,会返回fallBack的结果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 以异步非堵塞方式执行run(),所以消耗了3个线程
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //上面消耗了所有线程,这边会执行到fallBack中
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
我们做一下调整,让线程池的key(HystrixThreadPoolKey)不一致,再测试是否返回正常的执行结果。
/**
* @author brand
* @Description: 线程池隔离
* @Copyright: Copyright (c) 2022
* @Company: Helenlyn, Inc. All Rights Reserved.
* @date 2022/1/8 下午5:58
* @Update Time:
* @Updater:
* @Update Comments:
*/
public class HystrixThreadPool extends HystrixCommand<String> {
private final String name;
public HystrixThreadPool(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup")) // CommandGroup分组
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) // 线程池key,根据请求的入参来算
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(5000)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3) // 配置线程池里的线程数为3。超过3次进行熔断
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
/*---------------如果线程数超配,会触发fallback的case,否则休眠1s,进行正常返回-------------------*/
TimeUnit.MILLISECONDS.sleep(1000);
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name;
}
}
测试一下,下面都是使用 HystrixThreadPoolKey 为 ThreadPoolTest的线程池命名,所以是公用,会返回fallBack的结果。
for(int i = 0; i < 3; i++) {
try {
Future<String> future = new HystrixThreadPool("thread pool"+i).queue(); // 会有三个线程池组 thread pool1、thread poo2、thread pool3,不互相影响,更不会影响下面excute()的执行
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
for(int i = 0; i < 10; i++) {
try {
System.out.println("===========" + new HystrixThreadPool("thread pool").execute()); //与上面隔离,所以这边执行始终不会走到fallBack中
} catch(Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
https://github.com/WengZhiHua/Helenlyn.Grocery/tree/master/parent/HystrixDemo