Hystrix是Netflix开源的一个解决分布式系统延迟和服务容错的类库。它主要实现了上一篇文章中的断路器模式。除了断路器模式之外,Hystrix还提供了服务隔离请求合并请求缓存的功能。

Netflix在2018年就已经宣布不再更新Hystrix项目了,将重点转向能够自适应应用实时性能指标的实现,而不是像现在这样,只能预先配置好相关选项。这个也是流量治理的发展方向。在云原生时代,Service Mesh实现弹性的、分布式的流量治理,会是一个令人期待的特性。

虽然Hystrix已经不再更新,但是它的一些实现原理能够帮助我们更好的理解流量治理。所以本文对Hystrix的实现原理进行一些研究。

命令模式

Hystrix为用户提供的最常见的接口是@HystrixCommand这个注解。此注解隶属于Netflix 一个叫“javanica”的contrib包中。既然是个注解,那么一定有一个processor去解析并处理它,Hystrix使用AspectJ对这个注解进行AOP的处理,相应的切面就是com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect这个类。因为AOP不是本文的重点,具体的逻辑就不在这里赘述了。
从注解的名字可以看出来,Hystrix使用了设计模式中的命令模式来包裹对依赖的调用逻辑,每个命令在独立线程中执行。那么什么是命令模式呢?

命令模式(Command Pattern)是一种数据驱动的设计模式,它属于行为型模式。请求以命令的形式包裹在对象中,并传给调用对象。调用对象寻找可以处理该命令的合适的对象,并把该命令传给相应的对象,该对象执行命令。
看完定义,不知所云。直接看一个简单的开关灯的例子,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Light {
public Light() {
}

public void turnOn() {
System.out.println("The light is on");
}

public void turnOff() {
System.out.println("The light is off");
}
}

public class Switch {

private Light light;

public Switch(Light light) {
this.light = light;
}

public openLight() {
this.light.turnOn();
}

public closeLight() {
this.light.turnOff();
}
}

从上面的代码可以看到,行为请求者Switch和行为调用者Light耦合在了一起,Switch类无法被重用,使用命令模式,我们可以将行为执行者和执行逻辑封装成一个命令对象,传递给行为请求者,从而将二者解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.util.List;
import java.util.ArrayList;

/* The Command interface */
public interface Command {
void execute();
}

/* The Invoker class */
public class Switch {

public Switch() {
}

public void storeAndExecute(Command cmd) {
cmd.execute();
}
}

/* The Receiver class */
public class Light {
public Light() {
}

public void turnOn() {
System.out.println("The light is on");
}

public void turnOff() {
System.out.println("The light is off");
}
}

/* The Command for turning on the light - ConcreteCommand #1 */
public class FlipUpCommand implements Command {
private Light theLight;

public FlipUpCommand(Light light) {
this.theLight = light;
}

public void execute(){
theLight.turnOn();
}
}

/* The Command for turning off the light - ConcreteCommand #2 */
public class FlipDownCommand implements Command {
private Light theLight;

public FlipDownCommand(Light light) {
this.theLight = light;
}

public void execute() {
theLight.turnOff();
}
}

/* The test class or client */
public class PressSwitch {
public static void main(String[] args){
Light lamp = new Light();
Command switchUp = new FlipUpCommand(lamp);
Command switchDown = new FlipDownCommand(lamp);

Switch mySwitch = new Switch();

try {
if ("ON".equalsIgnoreCase(args[0])) {
mySwitch.storeAndExecute(switchUp);
}
else if ("OFF".equalsIgnoreCase(args[0])) {
mySwitch.storeAndExecute(switchDown);
}
else {
System.out.println("Argument \"ON\" or \"OFF\" is required.");
}
} catch (Exception e) {
System.out.println("Arguments required.");
}
}
}

从上述代码中可以看出,命令模式可以分为几种角色:

  • Invoker:行为请求者
  • Receiver:行为执行者
  • Command:抽象的命令接口
  • ConcreteCommand:具体的命令
  • client:负责将Receiver组装成具体的命令,最终传递给Invoker调用

最终的uml图如下:

Hystrix工作流程

Hystrix工作流程如图:

1.构造一个HystrixCommand或者HystrixObservableCommand对象

命令模式应用到Hystrix中,Invoker就是我们配置了Hystrix的应用本身,而Receiver就是我们项目中使用的一些远程调用的工具,比如restTemplate或者feign,充当Command接口的就是HystrixCommand或者HystrixObservableCommand。需要注意的是,这里的HystrixCommand是com.netflix.hystrix包下的,而不是上面提到的contrib包下的HystrixCommand注解。
HystrixCommand的基本用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CommandHelloWorld extends HystrixCommand<String> {

private final String name;

public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}

@Override
protected String run() {
// a real example would do work like a network call here
return "Hello " + name + "!";
}
}

相同的逻辑HystrixObservableCommand的用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CommandHelloWorld extends HystrixObservableCommand<String> {

private final String name;

public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}

@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
// a real example would do work like a network call here
observer.onNext("Hello");
observer.onNext(name + "!");
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribeOn(Schedulers.io());
}
}

构造成的HystrixCommand或者HystrixObservableCommand对象就代表了Invoker对依赖的服务也就是Receiver的请求。其中HystrixCommand代表依赖的服务返回单个响应;而HystrixObservableCommand代表依赖的服务返回多个响应。

2.执行命令

当执行HystrixCommand命令时,会使用两个方法:

1
2
3
4
// 同步执行
K value = command.execute();
// 异步执行
Future<K> fValue = command.queue();

当执行HystrixObservableCommand命令时,会使用另外两个方法:

1
2
Observable<K> ohValue = command.observe();         //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable

无论HystrixCommand还是HystrixObservableCommand底层都依赖RxJava的实现,Observable也是RxJava的对象,这里不作赘述,只需要知道hot observable的订阅者是从中途加入,没有办法获取“历史的事件”,而cold observable能保证订阅者看到“全部的事件”。

其余步骤

直接看图吧,没什么可说的。

断路器原理

下图描述了HystrixCommand怎样与HystrixCircuitBreaker协作的:

先来看HystrixCircuitBreaker的定义:

1
2
3
4
5
6
7
8
9
10
11
public interface HystrixCircuitBreaker {
// 每个HystrixCommand请求都通过此方法判断是否执行
public boolean allowRequest();
// 返回断路器是否打开
public boolean isOpen();
// 闭合断路器
void markSuccess();
public static class Factory {...}
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {...}
static class NoOpCircuitBreaker implements HystrixCircuitBreaker {...}
}

注意这里面定义的三个静态类:

  • Factory:这个类里维护了一个HystrixCommandKey和对应断路器的映射ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand。
  • NoOpCircuitBreaker:一个什么都不做的断路器实现,断路器状态始终闭合。
  • HystrixCircuitBreakerImpl:默认实现

HystrixCircuitBreakerImpl

1
2
3
4
5
6
7
8
9
10
11
12
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
// HystrixCommand实例的属性对象
private final HystrixCommandProperties properties;
// 用于记录各类指标
private final HystrixCommandMetrics metrics;
// 断路器是否打开
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
// 断路器打开或者上一次测试时间
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

...
}

isOpen方法会判断断路器的开关状态,看一下isOpen方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}

上述代码中有几个关键属性与上一篇文章中介绍断路器原理相对应。
HealthCounts对象记录了一个滚动时间窗口内的请求信息快照,默认时间窗是10秒。对应于单位时间circuitBreakerRequestVolumeThreshold对应于一定数量的请求,默认值20。circuitBreakerErrorThresholdPercentage对应于故障率,默认值50。
断路器除了开、关的状态,还有一个半开的状态。半开的逻辑在allowSingleTest方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public boolean allowRequest() {
// 一般不会进
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 一般不会进
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
// 如果断路器是开启状态,进行半开状态逻辑
return !isOpen() || allowSingleTest();
}

public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}

circuitBreakerSleepWindowInMilliseconds代表断路器休眠时间,默认5秒。

服务隔离

Hystrix的服务隔离使用舱壁模式实现。提供两种方式,线程池和信号量。
使用线程池还是信号量的逻辑在com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation方法中。如果使用线程池模式,那么最关键的代码是executeCommandWithSpecifiedIsolation方法中

1
2
3
4
5
6
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));

threadPool对象的类型是com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault。最终就是使用java.util.concurrent.ThreadPoolExecutor执行真正的用户请求。
信号量的开销远比线程池的开销小,但是它不能设置超时和实现异步访问。所以,只有在依赖服务足够可靠才能使用信号量。