Skip to content

Commit

Permalink
Implement time window for FailureRateCircuitBreaker (#24)
Browse files Browse the repository at this point in the history
Compute the failure rate over a fixed time window, if provided.
  • Loading branch information
AlessandroPatti authored and Richard Howell committed Jun 30, 2020
1 parent 7c28ef5 commit 3dcf1c9
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src/main/java/com/google/devtools/build/lib/remote/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import com.google.devtools.build.lib.remote.Retrier.CircuitBreaker.State;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -141,35 +144,33 @@ public void recordSuccess() {}

public static class FailureRateCircuitBreaker implements CircuitBreaker {

// 100 arbitrarily chosen as a fair balance between giving up
// too quickly vs trying too much and giving up too late.
public static final int DEFAULT_MIN_EXECUTIONS_TO_COMPUTE_FAILURE_RATE = 100;
private static Logger logger = Logger.getLogger(FailureRateCircuitBreaker.class.getName());

public static final int DEFAULT_MIN_EXECUTIONS_TO_COMPUTE_FAILURE_RATE = 10;

private State state;
private final AtomicInteger failures;
private final AtomicInteger successes;
private final double maxFailureRate;
private final int minExecutionsToComputeFailureRate;
private final ImmutableSet<Class> ignoredErrors;
private final int timeWindow;

public FailureRateCircuitBreaker(double maxFailureRate) {
this(maxFailureRate, DEFAULT_MIN_EXECUTIONS_TO_COMPUTE_FAILURE_RATE, ImmutableSet.of());
}

public FailureRateCircuitBreaker(double maxFailureRate, Iterable<Class> ignoredErrors) {
this(maxFailureRate, DEFAULT_MIN_EXECUTIONS_TO_COMPUTE_FAILURE_RATE, ignoredErrors);
}
private final ScheduledExecutorService scheduler;

public FailureRateCircuitBreaker(
double maxFailureRate,
int minExecutionsToComputeFailureRate,
int failureRateTimeWindowSize,
Iterable<Class> ignoredErrors) {
this.maxFailureRate = maxFailureRate;
this.state = State.ACCEPT_CALLS;
this.failures = new AtomicInteger(0);
this.successes = new AtomicInteger(0);
this.minExecutionsToComputeFailureRate = minExecutionsToComputeFailureRate;
this.ignoredErrors = ImmutableSet.copyOf(ignoredErrors);
this.timeWindow = failureRateTimeWindowSize;
this.scheduler = timeWindow > 0 ? Executors.newScheduledThreadPool(1) : null;
}

@Override
Expand All @@ -183,19 +184,30 @@ public void recordFailure(@Nullable Throwable t) {
return;
}
double currentFailures = failures.incrementAndGet();
if (timeWindow > 0) {
scheduler.schedule(failures::decrementAndGet, timeWindow, TimeUnit.SECONDS);
}
double currentSuccesses = successes.get();
if (currentFailures + currentSuccesses < minExecutionsToComputeFailureRate) {
// Execute at least minExecutionsToComputeFailureRate before computing the rate
return;
}
if (currentFailures / (currentFailures + currentSuccesses) > maxFailureRate) {
double failureRate = currentFailures / (currentFailures + currentSuccesses);
if (failureRate > maxFailureRate && state != State.REJECT_CALLS) {
logger.warning(
String.format(
"Failure rate %.2f too high: %.0f fail, %.0f success. Rejecting calls.",
failureRate, currentFailures, currentSuccesses));
state = State.REJECT_CALLS;
}
}

@Override
public void recordSuccess() {
successes.incrementAndGet();
if (timeWindow > 0) {
scheduler.schedule(successes::decrementAndGet, timeWindow, TimeUnit.SECONDS);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public static RemoteRetrier newRetrier(RemoteOptions options) {
options != null && options.remoteMaxFailureRate > 0
? new FailureRateCircuitBreaker(
options.remoteMaxFailureRate,
FailureRateCircuitBreaker.DEFAULT_MIN_EXECUTIONS_TO_COMPUTE_FAILURE_RATE,
options.remoteFailureRateTimeWindowSize,
ImmutableList.of(CacheNotFoundException.class)
)
: Retrier.ALLOW_ALL_CALLS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,19 @@ public final class RemoteOptions extends OptionsBase {
+ " This only works with HTTP cache at the moment.")
public double remoteMaxFailureRate;

@Option(
name = "remote_failure_rate_window_size",
defaultValue = "0",
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.UNKNOWN},
help =
"The moving time window on which the failure rate of the remote requests is computed."
+ " That is, only requests occurred in the last <n> seconds will contribute to the"
+ " calculation of the failure rate. "
+ " If n=0, the failure rate is computed on the whole duration of the execution."
+ " This only works with HTTP cache at the moment.")
public int remoteFailureRateTimeWindowSize;

@Option(
name = "disk_cache",
defaultValue = "null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
FailureRateCircuitBreaker circuitBreaker= new FailureRateCircuitBreaker(
/* maxFailureRate */ options.remoteMaxFailureRate,
/* minExecutionsToComputeFailureRate */ 5,
/* failureRateTimeWindowSize */ 0,
ImmutableList.of(CacheNotFoundException.class)
);
HttpCacheClient blobStore =
Expand Down

0 comments on commit 3dcf1c9

Please sign in to comment.