Skip to content

Commit

Permalink
Issue ReactiveX#13 Store exceptions and allow to retrieve the latest …
Browse files Browse the repository at this point in the history
…handled exceptions
  • Loading branch information
Robert Winkler committed Nov 23, 2016
1 parent b751ff0 commit cac5d4e
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 49 deletions.
95 changes: 56 additions & 39 deletions src/main/java/javaslang/circuitbreaker/CircuitBreakerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,28 @@

public class CircuitBreakerConfig {

private static final int DEFAULT_MAX_FAILURE_THRESHOLD = 50; // Percentage
private static final int DEFAULT_WAIT_DURATION_IN_OPEN_STATE = 60; // Seconds
private static final int DEFAULT_RING_BUFFER_SIZE_IN_HALF_OPEN_STATE = 10;
private static final int DEFAULT_RING_BUFFER_SIZE_IN_CLOSED_STATE = 100;
public static final int DEFAULT_MAX_FAILURE_THRESHOLD = 50; // Percentage
public static final int DEFAULT_WAIT_DURATION_IN_OPEN_STATE = 60; // Seconds
public static final int DEFAULT_RING_BUFFER_SIZE_IN_HALF_OPEN_STATE = 10;
public static final int DEFAULT_RING_BUFFER_SIZE_IN_CLOSED_STATE = 100;
public static final int DEFAULT_EXCEPTION_RING_BUFFER_SIZE = 10;

private final float failureRateThreshold;
private int ringBufferSizeInHalfOpenState;
private int ringBufferSizeInClosedState;
private final int ringBufferSizeInHalfOpenState;
private final int exceptionRingBufferSize;
private final int ringBufferSizeInClosedState;
private final Duration waitDurationInOpenState;
private final CircuitBreakerEventListener circuitBreakerEventListener;
private final Predicate<Throwable> exceptionPredicate;

private CircuitBreakerConfig(float failureRateThreshold,
Duration waitDurationInOpenState,
int ringBufferSizeInHalfOpenState,
int ringBufferSizeInClosedState,
Predicate<Throwable> exceptionPredicate,
CircuitBreakerEventListener circuitBreakerEventListener){
this.failureRateThreshold = failureRateThreshold;
this.waitDurationInOpenState = waitDurationInOpenState;
this.ringBufferSizeInHalfOpenState = ringBufferSizeInHalfOpenState;
this.ringBufferSizeInClosedState = ringBufferSizeInClosedState;
this.exceptionPredicate = exceptionPredicate;
this.circuitBreakerEventListener = circuitBreakerEventListener;
private CircuitBreakerConfig(Context context){
this.failureRateThreshold = context.failureRateThreshold;
this.waitDurationInOpenState = context.waitDurationInOpenState;
this.ringBufferSizeInHalfOpenState = context.ringBufferSizeInHalfOpenState;
this.ringBufferSizeInClosedState = context.ringBufferSizeInClosedState;
this.exceptionRingBufferSize = context.exceptionRingBufferSize;
this.exceptionPredicate = context.exceptionPredicate;
this.circuitBreakerEventListener = context.circuitBreakerEventListener;

}

Expand Down Expand Up @@ -77,6 +75,10 @@ public Predicate<Throwable> getExceptionPredicate() {
return exceptionPredicate;
}

public int getExceptionRingBufferSize() {
return exceptionRingBufferSize;
}

/**
* Returns a builder to create a custom CircuitBreakerConfig.
*
Expand All @@ -95,15 +97,9 @@ public static CircuitBreakerConfig ofDefaults(){
return new Builder().build();
}


public static class Builder {
private int failureRateThreshold = DEFAULT_MAX_FAILURE_THRESHOLD;
private int ringBufferSizeInHalfOpenState = DEFAULT_RING_BUFFER_SIZE_IN_HALF_OPEN_STATE;
private int ringBufferSizeInClosedState = DEFAULT_RING_BUFFER_SIZE_IN_CLOSED_STATE;
private Duration waitDurationInOpenState = Duration.ofSeconds(DEFAULT_WAIT_DURATION_IN_OPEN_STATE);
private CircuitBreakerEventListener circuitBreakerEventListener = new DefaultCircuitBreakerEventListener();
// The default exception predicate counts all exceptions as failures.
private Predicate<Throwable> exceptionPredicate = (exception) -> true;

private Context context = new Context();

/**
* Configures the failure rate threshold in percentage above which the CircuitBreaker should trip open and start short-circuiting calls.
Expand All @@ -117,7 +113,7 @@ public Builder failureRateThreshold(int failureRateThreshold) {
if (failureRateThreshold < 1 || failureRateThreshold > 100) {
throw new IllegalArgumentException("failureRateThreshold must be between 1 and 100");
}
this.failureRateThreshold = failureRateThreshold;
context.failureRateThreshold = failureRateThreshold;
return this;
}

Expand All @@ -132,7 +128,7 @@ public Builder waitDurationInOpenState(Duration waitDurationInOpenState) {
if (waitDurationInOpenState.getSeconds() < 1) {
throw new IllegalArgumentException("waitDurationInOpenState must be at least 1000[ms]");
}
this.waitDurationInOpenState = waitDurationInOpenState;
context.waitDurationInOpenState = waitDurationInOpenState;
return this;
}

Expand All @@ -150,7 +146,7 @@ public Builder ringBufferSizeInHalfOpenState(int ringBufferSizeInHalfOpenState)
if (ringBufferSizeInHalfOpenState < 1 ) {
throw new IllegalArgumentException("ringBufferSizeInHalfOpenState must be greater than 0");
}
this.ringBufferSizeInHalfOpenState = ringBufferSizeInHalfOpenState;
context.ringBufferSizeInHalfOpenState = ringBufferSizeInHalfOpenState;
return this;
}

Expand All @@ -168,7 +164,23 @@ public Builder ringBufferSizeInClosedState(int ringBufferSizeInClosedState) {
if (ringBufferSizeInClosedState < 1) {
throw new IllegalArgumentException("ringBufferSizeInClosedState must be greater than 0");
}
this.ringBufferSizeInClosedState = ringBufferSizeInClosedState;
context.ringBufferSizeInClosedState = ringBufferSizeInClosedState;
return this;
}

/**
* Configures the size of the ring buffer which buffers the latest exceptions which are recorded as a failure and thus increase the failure rate.
*
* Default size is 10. A size of 0 disables buffering.
*
* @param exceptionRingBufferSize the size of the exception ring buffer.
* @return the CircuitBreakerConfig.Builder
*/
public Builder exceptionRingBufferSize(int exceptionRingBufferSize) {
if (exceptionRingBufferSize < 0) {
throw new IllegalArgumentException("exceptionRingBufferSize must be greater than or equal to 0");
}
context.exceptionRingBufferSize = exceptionRingBufferSize;
return this;
}

Expand All @@ -182,7 +194,7 @@ public Builder onCircuitBreakerEvent(CircuitBreakerEventListener circuitBreakerE
if (circuitBreakerEventListener == null) {
throw new IllegalArgumentException("circuitBreakerEventListener must not be null");
}
this.circuitBreakerEventListener = circuitBreakerEventListener;
context.circuitBreakerEventListener = circuitBreakerEventListener;
return this;
}

Expand All @@ -194,7 +206,7 @@ public Builder onCircuitBreakerEvent(CircuitBreakerEventListener circuitBreakerE
* @return the CircuitBreakerConfig.Builder
*/
public Builder recordFailure(Predicate<Throwable> predicate) {
this.exceptionPredicate = predicate;
context.exceptionPredicate = predicate;
return this;
}

Expand All @@ -204,13 +216,18 @@ public Builder recordFailure(Predicate<Throwable> predicate) {
* @return the CircuitBreakerConfig
*/
public CircuitBreakerConfig build() {
return new CircuitBreakerConfig(
failureRateThreshold,
waitDurationInOpenState,
ringBufferSizeInHalfOpenState,
ringBufferSizeInClosedState,
exceptionPredicate,
circuitBreakerEventListener);
return new CircuitBreakerConfig(context);
}
}

private static class Context {
float failureRateThreshold = DEFAULT_MAX_FAILURE_THRESHOLD;
int ringBufferSizeInHalfOpenState = DEFAULT_RING_BUFFER_SIZE_IN_HALF_OPEN_STATE;
int ringBufferSizeInClosedState = DEFAULT_RING_BUFFER_SIZE_IN_CLOSED_STATE;
int exceptionRingBufferSize = DEFAULT_EXCEPTION_RING_BUFFER_SIZE;
Duration waitDurationInOpenState = Duration.ofSeconds(DEFAULT_WAIT_DURATION_IN_OPEN_STATE);
CircuitBreakerEventListener circuitBreakerEventListener = new DefaultCircuitBreakerEventListener();
// The default exception predicate counts all exceptions as failures.
Predicate<Throwable> exceptionPredicate = (exception) -> true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,36 @@


import javaslang.circuitbreaker.CircuitBreaker;
import javaslang.circuitbreaker.CircuitBreakerConfig;
import javaslang.collection.List;

class CircuitBreakerMetrics implements CircuitBreaker.Metrics {

private final RingBitSet ringBitSet;
private final CircularFifoBuffer<Throwable> exceptionRingBuffer;

/**
* Maximum number of buffered calls
*/
private int maxNumberOfBufferedCalls;

private int maxNumberOfBufferedExceptions;

CircuitBreakerMetrics(int ringBufferSize) {
this.ringBitSet = new RingBitSet(ringBufferSize);
this.maxNumberOfBufferedExceptions = CircuitBreakerConfig.DEFAULT_EXCEPTION_RING_BUFFER_SIZE;
this.exceptionRingBuffer = new CircularFifoBuffer<>(this.maxNumberOfBufferedExceptions);
this.maxNumberOfBufferedCalls = ringBufferSize;
}

CircuitBreakerMetrics(int ringBufferSize, int exceptionRingBufferSize) {
this.ringBitSet = new RingBitSet(ringBufferSize);
this.maxNumberOfBufferedExceptions = exceptionRingBufferSize;
if(maxNumberOfBufferedExceptions > 0) {
this.exceptionRingBuffer = new CircularFifoBuffer<>(maxNumberOfBufferedExceptions);
}else{
this.exceptionRingBuffer = new CircularFifoBuffer<>(1);
}
this.maxNumberOfBufferedCalls = ringBufferSize;
}

Expand All @@ -40,8 +58,11 @@ class CircuitBreakerMetrics implements CircuitBreaker.Metrics {
*
* @return the current failure rate in percentage.
*/
public synchronized float recordFailure(){
public synchronized float recordFailure(Throwable throwable){
ringBitSet.setNextBit(true);
if(maxNumberOfBufferedExceptions > 0) {
exceptionRingBuffer.add(throwable);
}
return getFailureRate();
}

Expand Down Expand Up @@ -75,6 +96,15 @@ public long getMaxNumberOfBufferedCalls() {
return maxNumberOfBufferedCalls;
}

/**
* Returns the maximum number of buffered exceptions.
*
* @return the maximum number of buffered exceptions
*/
public long getMaxNumberOfBufferedExceptions() {
return maxNumberOfBufferedExceptions;
}

@Override
public synchronized int getNumberOfBufferedCalls() {
return this.ringBitSet.length();
Expand All @@ -84,4 +114,9 @@ public synchronized int getNumberOfBufferedCalls() {
public synchronized int getNumberOfFailedCalls() {
return this.ringBitSet.cardinality();
}

@Override
public List<Throwable> getBufferedExceptions() {
return exceptionRingBuffer.toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class CircuitBreakerState{

abstract boolean isCallPermitted();

abstract void recordFailure();
abstract void recordFailure(Throwable throwable);

abstract void recordSuccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public boolean isCallPermitted() {
@Override
public void recordFailure(Throwable throwable) {
if(circuitBreakerConfig.getExceptionPredicate().test(throwable)){
stateReference.get().recordFailure();
stateReference.get().recordFailure(throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
*
* Copyright 2015 Robert Winkler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package javaslang.circuitbreaker.internal;

import javaslang.collection.List;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
* A CircularFifoBuffer is a first in first out buffer with a fixed size that replaces its oldest element if full.
**/
public class CircularFifoBuffer<T> {

private final ArrayBlockingQueue<T> fifoQueue;
private final ReentrantLock lock = new ReentrantLock();

/**
* Creates an {@code CircularFifoBuffer} with the given (fixed)
* capacity
*
* @param capacity the capacity of this CircularFifoBuffer
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public CircularFifoBuffer(int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("CircularFifoBuffer capacity must be greater than 0");
}
fifoQueue = new ArrayBlockingQueue<>(capacity);
}

/**
* Returns the number of elements in this CircularFifoBuffer.
*
* @return the number of elements in this CircularFifoBuffer
*/
public int size() {
return fifoQueue.size();
}

/**
* Returns <tt>true</tt> if this CircularFifoBuffer contains no elements.
*
* @return <tt>true</tt> if this CircularFifoBuffer contains no elements
*/
public boolean isEmpty() {
return fifoQueue.isEmpty();
}

/**
* Returns <tt>true</tt> if this CircularFifoBuffer is full.
*
* @return <tt>true</tt> if this CircularFifoBuffer is full
*/
public boolean isFull() {
return fifoQueue.remainingCapacity() == 0;
}

/**
* Returns a list containing all of the elements in this CircularFifoBuffer.
* The elements are copied into an array.
*
* @return a list containing all of the elements in this CircularFifoBuffer
*/
public List<T> toList(){
return List.ofAll(fifoQueue);
}

/**
* Overwrites the oldest element when full.
*/
public void add(T element) {
if(!fifoQueue.offer(element)){
final ReentrantLock lock = this.lock;
lock.lock();
try {
fifoQueue.remove();
fifoQueue.add(element);
} finally {
lock.unlock();
}
}
}
}
10 changes: 7 additions & 3 deletions src/main/java/javaslang/circuitbreaker/internal/ClosedState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package javaslang.circuitbreaker.internal;

import javaslang.circuitbreaker.CircuitBreaker;
import javaslang.circuitbreaker.CircuitBreakerConfig;

final class ClosedState extends CircuitBreakerState {

Expand All @@ -27,7 +28,10 @@ final class ClosedState extends CircuitBreakerState {

ClosedState(CircuitBreakerStateMachine stateMachine) {
super(stateMachine);
this.circuitBreakerMetrics = new CircuitBreakerMetrics(stateMachine.getCircuitBreakerConfig().getRingBufferSizeInClosedState());
CircuitBreakerConfig circuitBreakerConfig = stateMachine.getCircuitBreakerConfig();
this.circuitBreakerMetrics = new CircuitBreakerMetrics(
circuitBreakerConfig.getRingBufferSizeInClosedState(),
circuitBreakerConfig.getExceptionRingBufferSize());
this.failureRateThreshold = stateMachine.getCircuitBreakerConfig().getFailureRateThreshold();
}

Expand All @@ -42,8 +46,8 @@ boolean isCallPermitted() {
}

@Override
void recordFailure() {
checkFailureRate(circuitBreakerMetrics.recordFailure());
void recordFailure(Throwable throwable) {
checkFailureRate(circuitBreakerMetrics.recordFailure(throwable));
}

@Override
Expand Down
Loading

0 comments on commit cac5d4e

Please sign in to comment.