/* The test class or client */ publicclassPressSwitch{ publicstaticvoidmain(String[] args){ Light lamp = new Light(); Command switchUp = new FlipUpCommand(lamp); Command switchDown = new FlipDownCommand(lamp);
@Override protected Observable<String> construct(){ return Observable.create(new Observable.OnSubscribe<String>() { @Override publicvoidcall(Subscriber<? super String> observer){ try { if (!observer.isUnsubscribed()) { // a real example would do work like a network call here observer.onNext("Hello"); observer.onNext(name + "!"); observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribeOn(Schedulers.io()); } }
@Override publicbooleanisOpen(){ if (circuitOpen.get()) { // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close returntrue; } // we're closed, so let's see if errors have made us so we should trip the circuit open HealthCounts health = metrics.getHealthCounts(); // check if we are past the statisticalWindowVolumeThreshold if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything returnfalse; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { returnfalse; } else { // our failure rate is too high, trip the circuit if (circuitOpen.compareAndSet(false, true)) { // if the previousValue was false then we want to set the currentTime circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); returntrue; } else { // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have // caused another thread to set it to true already even though we were in the process of doing the same // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open returntrue; } } }
publicbooleanallowSingleTest(){ long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 1) if the circuit is open // 2) and it's been longer than 'sleepWindow' since we opened the circuit if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties circuitBreakerSleepWindowInMilliseconds().get()) { // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'. if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { // if this returns true that means we set the time so we'll return true to allow the singleTest // if it returned false it means another thread raced us and allowed the singleTest before we did returntrue; } } returnfalse; }