Hystrix剖析

在复杂的业务模型下,面向服务架构SOA在被Amazon发扬光大后逐渐被大家所接受并学习。当然一切都是为了 去耦合。关于平台与SOA的论述,一篇非常好的文章被陈皓翻译后放在了coolshell上,SteveY对Amazon和Google平台的吐槽。coolshell曾经被技术leader认为是最完美的技术blog, 品博客3000后,我也深有同感。

接上,SOA解决的是耦合的问题,但带来的是将服务内部调用切换到了服务间调用上。这种情景下,如何将服务构建的更:

就是 Hystrix 解决的问题。

Hystrix wiki中的例子很有说服力,如果依赖30个外部服务,每个服务的可用性都是4个9,那么

99.99的30次方 = 99.7% ,你的服务的可用性才2个9 0.3% of 1 billion requests = 3,000,000 failures,一亿次请求中就有300万次失败。 2+ hours downtime/month even if all dependencies have excellent uptime. 每个月就会2个小时的down机时间。

解决如上问题无非从以下方面入手:

Hystrix通过线程隔离,超时、故障监控,熔断等方式实现系统的高可用。 在技术上,Hystrix使用了command模式

源码分析

整个请求的生命周期如下:

请求生命周期

构建command 并执行

以 queue() 为例, 调用后异步返回结果。


public Future<R> queue() {

        // 获取 Observable 对象, 观察者模式
        final Observable<R> o = toObservable();

        // 进而转换为 queue 的 Future
        final Future<R> f = o.toBlocking().toFuture();

        /* special handling of error states that throw immediately */
        if (f.isDone()) {
            try {
                f.get();
                return f;
            } catch (Exception e) {
                RuntimeException re = decomposeException(e);
                if (re instanceof HystrixBadRequestException) {
                    return f;
                } else if (re instanceof HystrixRuntimeException) {
                    HystrixRuntimeException hre = (HystrixRuntimeException) re;
                    if (hre.getFailureType() == FailureType.COMMAND_EXCEPTION || hre.getFailureType() == FailureType.TIMEOUT) {
                        // we don't throw these types from queue() only from queue().get() as they are execution errors
                        return f;
                    } else {
                        // these are errors we throw from queue() as they as rejection type errors
                        throw hre;
                    }
                } else {
                    throw re;
                }
            }
        }

        // 返回 Future 对象
        return f;
    }

而获取 Observable 对象的方式如下, 关于 Hystrix 中使用的响应式库 RxJava 可见 RxJava Github,一个详细的介绍可见:https://gank.io/post/560e15be2dca930e00da1083#toc_1


public Observable<R> toObservable() {
    /* this is a stateful object so can only be used once */
    if (!started.compareAndSet(false, true)) {
        throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
    }

    final HystrixInvokable<R> _this = this;

    // 图中的 3 逻辑
    // 是否配置了使用缓存,通过读取实例化 Command 时设置的 properties 来的获取
    final boolean requestCacheEnabled = isRequestCachingEnabled();

    // 尝试从缓存中获取结果
    if (requestCacheEnabled) {
        Observable<R> fromCache = requestCache.get(getCacheKey());
        if (fromCache != null) {
            long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
            executionResult = executionResult.markUserThreadCompletion((int) latency);
            executionResult = executionResult.addEvent((int) latency, HystrixEventType.RESPONSE_FROM_CACHE);
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
            eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
            isExecutionComplete.set(true);
            try {
                executionHook.onCacheHit(this);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx);
            }
            return new CachedObservableResponse<R>((CachedObservableOriginal<R>) fromCache, this);
        }
    }

    // 创建被观察者, 当有订阅时执行
    Observable<R> o = Observable.create(new OnSubscribe<R>() {

        // 订阅发生时的执行逻辑
        @Override
        public void call(Subscriber<? super R> observer) {
            // async record keeping
            recordExecutedCommand();

            // mark that we're starting execution on the ExecutionHook
            // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
            executionHook.onStart(_this);

            // 图中的 4 逻辑
            // 熔断器是否允许请求
            if (circuitBreaker.allowRequest()) {

                // 获取 Command 配置的配置的并发信号量, 默认为并发线程为10
                // TryableSemaphore 内部维护了当前并发线程数和配置的最大线程数
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                // 当前线程是否超过了最大线程数
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

                        //真正的执行操作, 图中的步骤 6
                        getRunObservableDecoratedForMetricsAndErrorHandling()
                                .doOnTerminate(new Action0() {

                                    //the action to invoke when the source Observable calls {@code onCompleted} or {@code onError}

                                    @Override
                                    public void call() {
                                        // release the semaphore
                                        // this is done here instead of below so that the acquire/release happens where it is guaranteed
                                        // and not affected by the conditional circuit-breaker checks, timeouts, etc
                                        executionSemaphore.release();

                                    }
                                }).unsafeSubscribe(observer);
                    } catch (RuntimeException e) {
                        observer.onError(e);
                    }
                } else { // 未获取到执行线程, 图中 第5 步骤
                    Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
                    executionResult = executionResult.setExecutionException(semaphoreRejectionException);
                    eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
                    logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
                    // retrieve a fallback or throw an exception if no fallback available
                    getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
                            "could not acquire a semaphore for execution", semaphoreRejectionException)
                            .lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
                            .unsafeSubscribe(observer);
                }
            } else { // 熔断器处于打开状态, 图中第 4 步骤
                // record that we are returning a short-circuited fallback
                eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
                // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
                Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
                executionResult = executionResult.setExecutionException(shortCircuitException);
                try {
                    getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                            "short-circuited", shortCircuitException)
                            .lift(new DeprecatedOnCompleteWithValueHookApplication(_this))
                            .unsafeSubscribe(observer);
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        }
    });

    //apply all lifecycle hooks
    o = o.lift(new CommandHookApplication(this));

    // error handling at very end (this means fallback didn't exist or failed)
    o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {

        @Override
        public Observable<R> call(Throwable t) {
            // count that we are throwing an exception and re-throw it
            eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            return Observable.error(t);
        }

    });

    // 执行结束的标记
    o = o.doOnTerminate(new Action0() {

        @Override
        public void call() {
            Reference<TimerListener> tl = timeoutTimer.get();
            if (tl != null) {
                tl.clear();
            }

            long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
            executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
            // record that we're completed
            isExecutionComplete.set(true);
        }

    });

    // 缓存操作
    if (requestCacheEnabled) {
        // wrap it for caching
        o = new CachedObservableOriginal<R>(o.cache(), this);
        Observable<R> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
        if (fromCache != null) {
            // another thread beat us so we'll use the cached value instead
            o = new CachedObservableResponse<R>((CachedObservableOriginal<R>) fromCache, this);
        }
        // we just created an ObservableCommand so we cast and return it
        return o;
    } else {
        // no request caching so a simple wrapper just to pass 'this' along with the Observable
        return new ObservableCommand<R>(o, this);
    }
}

针对于步骤6的关键代码如下


/**
    * This decorate "Hystrix" functionality around the run() Observable.
    *
    * @return R
    */
   private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling() {
       final AbstractCommand<R> _self = this;

       final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

       Observable<R> run;
       if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
           // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

           run = Observable.create(new OnSubscribe<R>() {

               @Override
               public void call(Subscriber<? super R> s) {
                   metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                   if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                       // the command timed out in the wrapping thread so we will return immediately
                       // and not increment any of the counters below or other such logic
                       s.onError(new RuntimeException("timed out before executing run()"));
                   } else {
                       // not timed out so execute
                       HystrixCounters.incrementGlobalConcurrentThreads();
                       threadPool.markThreadExecution();
                       // store the command that is being run
                       endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
                       executionResult = executionResult.setExecutedInThread();
                       /**
                        * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                        */
                       try {
                           executionHook.onThreadStart(_self);
                           executionHook.onRunStart(_self);
                           executionHook.onExecutionStart(_self);
                           getExecutionObservableWithLifecycle().unsafeSubscribe(s);
                       } catch (Throwable ex) {
                           s.onError(ex);
                       }
                   }
               }
           }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

               @Override
               public Boolean call() {
                   return properties.executionIsolationThreadInterruptOnTimeout().get() && _self.isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT);
               }
           }));
       } else {
           metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
           // semaphore isolated
           // store the command that is being run
           endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
           try {
               executionHook.onRunStart(_self);
               executionHook.onExecutionStart(_self);
               run = getExecutionObservableWithLifecycle();  //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so this shouldn't throw
           } catch (Throwable ex) {
               //If the above hooks throw, then use that as the result of the run method
               run = Observable.error(ex);
           }
       }

       run = run.doOnEach(new Action1<Notification<? super R>>() {

           @Override
           public void call(Notification<? super R> n) {
               setRequestContextIfNeeded(currentRequestContext);
           }


       });
       if (properties.executionTimeoutEnabled().get()) {
           run = run.lift(new HystrixObservableTimeoutOperator<R>(_self));
       }
       run = run.doOnNext(new Action1<R>() {
           @Override
           public void call(R r) {
               if (shouldOutputOnNextEvents()) {
                   executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                   eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
               }
           }
       }).doOnCompleted(new Action0() {

           @Override
           public void call() {
               long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
               eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
               executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
               circuitBreaker.markSuccess();
               eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
           }

       }).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {

           @Override
           public Observable<R> call(Throwable t) {
               Exception e = getExceptionFromThrowable(t);
               executionResult = executionResult.setExecutionException(e);
               if (e instanceof RejectedExecutionException) {
                   /**
                    * Rejection handling
                    */
                   eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
                   threadPool.markThreadRejection();
                   // use a fallback instead (or throw exception if not implemented)
                   return getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e);
               } else if (t instanceof HystrixTimeoutException) {
                   /**
                    * Timeout handling
                    *
                    * Callback is performed on the HystrixTimer thread.
                    */
                   return getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
               } else if (t instanceof HystrixBadRequestException) {
                   /**
                    * BadRequest handling
                    */
                   try {
                       long executionLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                       eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                       executionResult = executionResult.addEvent((int) executionLatency, HystrixEventType.BAD_REQUEST);
                       Exception decorated = executionHook.onError(_self, FailureType.BAD_REQUEST_EXCEPTION, (Exception) t);

                       if (decorated instanceof HystrixBadRequestException) {
                           t = decorated;
                       } else {
                           logger.warn("ExecutionHook.onError returned an exception that was not an instance of HystrixBadRequestException so will be ignored.", decorated);
                       }
                   } catch (Exception hookEx) {
                       logger.warn("Error calling HystrixCommandExecutionHook.onError", hookEx);
                   }
                   /*
                    * HystrixBadRequestException is treated differently and allowed to propagate without any stats tracking or fallback logic
                    */
                   return Observable.error(t);
               } else {
                   /*
                    * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                    */
                   if (e instanceof HystrixBadRequestException) {
                       eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                       return Observable.error(e);
                   }

                   /**
                    * All other error handling
                    */
                   logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);

                   // report failure
                   eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

                   // record the exception
                   executionResult = executionResult.setException(e);
                   return getFallbackOrThrowException(HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", e);
               }
           }
       }).doOnEach(new Action1<Notification<? super R>>() {
           // setting again as the fallback could have lost the context
           @Override
           public void call(Notification<? super R> n) {
               setRequestContextIfNeeded(currentRequestContext);
           }

       }).doOnTerminate(new Action0() {
           @Override
           public void call() {
               //if the command timed out, then we've reached this point in the calling thread
               //but the Hystrix thread is still doing work.  Let it handle these markers.
               if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
                   handleThreadEnd();
               }
           }
       }).lift(new DeprecatedOnCompleteWithValueHookApplication(_self));

       return run;
   }