Hystrix剖析
在复杂的业务模型下,面向服务架构SOA在被Amazon发扬光大后逐渐被大家所接受并学习。当然一切都是为了 去耦合。关于平台与SOA的论述,一篇非常好的文章被陈皓翻译后放在了coolshell上,SteveY对Amazon和Google平台的吐槽。coolshell曾经被技术leader认为是最完美的技术blog, 品博客3000后,我也深有同感。
接上,SOA解决的是耦合的问题,但带来的是将服务内部调用切换到了服务间调用上。这种情景下,如何将服务构建的更:
- 高容错
- 高性能
- 更弹性
就是 Hystrix 解决的问题。
Hystrix wiki中的例子很有说服力,如果依赖30个外部服务,每个服务的可用性都是4个9,那么
99.99的30次方 = 99.7% ,你的服务的可用性才2个9 0.3% of 1 billion requests = 3,000,000 failures,一亿次请求中就有300万次失败。 2+ hours downtime/month even if all dependencies have excellent uptime. 每个月就会2个小时的down机时间。
解决如上问题无非从以下方面入手:
- fail fast 而不是 queueing.
- fallback 而不是 failure.
Hystrix通过线程隔离,超时、故障监控,熔断等方式实现系统的高可用。 在技术上,Hystrix使用了command模式。
源码分析
整个请求的生命周期如下:
构建command 并执行
以 queue() 为例, 调用后异步返回结果。
public Future<R> queue() {
// 获取 Observable 对象, 观察者模式
final Observable<R> o = toObservable();
// 进而转换为 queue 的 Future
final Future<R> f = o.toBlocking().toFuture();
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
RuntimeException re = decomposeException(e);
if (re instanceof HystrixBadRequestException) {
return f;
} else if (re instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) re;
if (hre.getFailureType() == FailureType.COMMAND_EXCEPTION || hre.getFailureType() == FailureType.TIMEOUT) {
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
} else {
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw re;
}
}
}
// 返回 Future 对象
return f;
}
而获取 Observable 对象的方式如下, 关于 Hystrix 中使用的响应式库 RxJava 可见 RxJava Github,一个详细的介绍可见:https://gank.io/post/560e15be2dca930e00da1083#toc_1
public Observable<R> toObservable() {
/* this is a stateful object so can only be used once */
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
}
final HystrixInvokable<R> _this = this;
// 图中的 3 逻辑
// 是否配置了使用缓存,通过读取实例化 Command 时设置的 properties 来的获取
final boolean requestCacheEnabled = isRequestCachingEnabled();
// 尝试从缓存中获取结果
if (requestCacheEnabled) {
Observable<R> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
executionResult = executionResult.markUserThreadCompletion((int) latency);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.RESPONSE_FROM_CACHE);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
isExecutionComplete.set(true);
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
}
return new CachedObservableResponse<R>((CachedObservableOriginal<R>) fromCache, this);
}
}
// 创建被观察者, 当有订阅时执行
Observable<R> o = Observable.create(new OnSubscribe<R>() {
// 订阅发生时的执行逻辑
@Override
public void call(Subscriber<? super R> observer) {
// async record keeping
recordExecutedCommand();
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_this);
// 图中的 4 逻辑
// 熔断器是否允许请求
if (circuitBreaker.allowRequest()) {
// 获取 Command 配置的配置的并发信号量, 默认为并发线程为10
// TryableSemaphore 内部维护了当前并发线程数和配置的最大线程数
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// 当前线程是否超过了最大线程数
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//真正的执行操作, 图中的步骤 6
getRunObservableDecoratedForMetricsAndErrorHandling()
.doOnTerminate(new Action0() {
//the action to invoke when the source Observable calls {@code onCompleted} or {@code onError}
@Override
public void call() {
// release the semaphore
// this is done here instead of below so that the acquire/release happens where it is guaranteed
// and not affected by the conditional circuit-breaker checks, timeouts, etc
executionSemaphore.release();
}
}).unsafeSubscribe(observer);
} catch (RuntimeException e) {
observer.onError(e);
}
} else { // 未获取到执行线程, 图中 第5 步骤
Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
executionResult = executionResult.setExecutionException(semaphoreRejectionException);
eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
// retrieve a fallback or throw an exception if no fallback available
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
.unsafeSubscribe(observer);
}
} else { // 熔断器处于打开状态, 图中第 4 步骤
// record that we are returning a short-circuited fallback
eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
executionResult = executionResult.setExecutionException(shortCircuitException);
try {
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
.unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
}
}
});
//apply all lifecycle hooks
o = o.lift(new CommandHookApplication(this));
// error handling at very end (this means fallback didn't exist or failed)
o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// count that we are throwing an exception and re-throw it
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
return Observable.error(t);
}
});
// 执行结束的标记
o = o.doOnTerminate(new Action0() {
@Override
public void call() {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}
long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
// record that we're completed
isExecutionComplete.set(true);
}
});
// 缓存操作
if (requestCacheEnabled) {
// wrap it for caching
o = new CachedObservableOriginal<R>(o.cache(), this);
Observable<R> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
o = new CachedObservableResponse<R>((CachedObservableOriginal<R>) fromCache, this);
}
// we just created an ObservableCommand so we cast and return it
return o;
} else {
// no request caching so a simple wrapper just to pass 'this' along with the Observable
return new ObservableCommand<R>(o, this);
}
}
针对于步骤6的关键代码如下
/**
* This decorate "Hystrix" functionality around the run() Observable.
*
* @return R
*/
private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling() {
final AbstractCommand<R> _self = this;
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
Observable<R> run;
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
run = Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> s) {
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
s.onError(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_self);
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
getExecutionObservableWithLifecycle().unsafeSubscribe(s);
} catch (Throwable ex) {
s.onError(ex);
}
}
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _self.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT);
}
}));
} else {
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
try {
executionHook.onRunStart(_self);
executionHook.onExecutionStart(_self);
run = getExecutionObservableWithLifecycle(); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
run = Observable.error(ex);
}
}
run = run.doOnEach(new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}
});
if (properties.executionTimeoutEnabled().get()) {
run = run.lift(new HystrixObservableTimeoutOperator<R>(_self));
}
run = run.doOnNext(new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
}
}).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
/**
* Rejection handling
*/
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
threadPool.markThreadRejection();
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e);
} else if (t instanceof HystrixTimeoutException) {
/**
* Timeout handling
*
* Callback is performed on the HystrixTimer thread.
*/
return getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
} else if (t instanceof HystrixBadRequestException) {
/**
* BadRequest handling
*/
try {
long executionLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
executionResult = executionResult.addEvent((int) executionLatency, HystrixEventType.BAD_REQUEST);
Exception decorated = executionHook.onError(_self, FailureType.BAD_REQUEST_EXCEPTION, (Exception) t);
if (decorated instanceof HystrixBadRequestException) {
t = decorated;
} else {
logger.warn("ExecutionHook.onError returned an exception that was not an instance of HystrixBadRequestException so will be ignored.", decorated);
}
} catch (Exception hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onError", hookEx);
}
/*
* HystrixBadRequestException is treated differently and allowed to propagate without any stats tracking or fallback logic
*/
return Observable.error(t);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
/**
* All other error handling
*/
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);
// report failure
eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
// record the exception
executionResult = executionResult.setException(e);
return getFallbackOrThrowException(HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", e);
}
}
}).doOnEach(new Action1<Notification<? super R>>() {
// setting again as the fallback could have lost the context
@Override
public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
//if the command timed out, then we've reached this point in the calling thread
//but the Hystrix thread is still doing work. Let it handle these markers.
if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
}).lift(new DeprecatedOnCompleteWithValueHookApplication(_self));
return run;
}