Skip to content

Commit

Permalink
Load Shedding: make it configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Apr 18, 2024
1 parent ce126cd commit 6706485
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
@Singleton
public class HttpLoadShedding {
public void init(@Observes @Priority(-1_000_000_000) Router router, OverloadDetector detector,
PriorityLoadShedding priorityLoadShedding) {
PriorityLoadShedding priorityLoadShedding, LoadSheddingRuntimeConfig config) {

if (!config.enabled()) {
return;
}

router.route().order(-1_000_000_000).handler(ctx -> {
if (detector.isOverloaded() && priorityLoadShedding.shedLoad(ctx.request())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.quarkus.load.shedding.runtime;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;

@ConfigMapping(prefix = "quarkus.load-shedding")
@ConfigRoot(phase = ConfigPhase.RUN_TIME)
public interface LoadSheddingRuntimeConfig {
/**
* Whether load shedding should be enabled.
* Currently, this only applies to incoming HTTP requests.
*/
@WithDefault("true")
boolean enabled();

/**
* The maximum number of concurrent requests allowed.
*/
@WithDefault("1000")
int maxLimit();

/**
* The {@code alpha} factor of the Vegas overload detection algorithm.
*/
@WithDefault("3")
int alphaFactor();

/**
* The {@code beta} factor of the Vegas overload detection algorithm.
*/
@WithDefault("6")
int betaFactor();

/**
* The probe factor of the Vegas overload detection algorithm.
*/
@WithDefault("30.0")
double probeFactor();

/**
* The initial limit of concurrent requests allowed.
*/
@WithDefault("100")
int initialLimit();

/**
* Configuration of priority load shedding.
*/
PriorityLoadShedding priority();

@ConfigGroup
interface PriorityLoadShedding {
/**
* Whether priority load shedding should be enabled.
*/
@WithDefault("true")
boolean enabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

/**
Expand All @@ -11,19 +12,25 @@
*/
@Singleton
public class OverloadDetector {
private static final int MAX_LIMIT = 1000;
private static final int ALPHA_FACTOR = 3;
private static final int BETA_FACTOR = 6;
private static final double PROBE_FACTOR = 30.0;
private final int maxLimit;
private final int alphaFactor;
private final int betaFactor;
private final double probeFactor;

private final AtomicInteger currentRequests = new AtomicInteger();
private volatile long currentLimit = 100;
private volatile long currentLimit;

private long lowestRequestTime = Long.MAX_VALUE;
private double probeCount = 0.0;
private double probeJitter;

public OverloadDetector() {
@Inject
public OverloadDetector(LoadSheddingRuntimeConfig config) {
maxLimit = config.maxLimit();
alphaFactor = config.alphaFactor();
betaFactor = config.betaFactor();
probeFactor = config.probeFactor();
currentLimit = config.initialLimit();
resetProbeJitter();
}

Expand All @@ -43,7 +50,7 @@ public void requestEnd(long timeInMicros) {

private synchronized void update(long requestTime, int currentRequests) {
probeCount++;
if (PROBE_FACTOR * probeJitter * currentLimit <= probeCount) {
if (probeFactor * probeJitter * currentLimit <= probeCount) {
resetProbeJitter();
probeCount = 0.0;
lowestRequestTime = requestTime;
Expand All @@ -64,8 +71,8 @@ private synchronized void update(long requestTime, int currentRequests) {
int queueSize = (int) Math.ceil(currentLimit * (1.0 - (double) lowestRequestTime / (double) requestTime));

int currentLimitLog10 = 1 + (int) Math.log10(currentLimit);
int alpha = ALPHA_FACTOR * currentLimitLog10;
int beta = BETA_FACTOR * currentLimitLog10;
int alpha = alphaFactor * currentLimitLog10;
int beta = betaFactor * currentLimitLog10;

long newLimit;
if (queueSize <= currentLimitLog10) {
Expand All @@ -78,7 +85,7 @@ private synchronized void update(long requestTime, int currentRequests) {
return;
}

newLimit = Math.max(1, Math.min(MAX_LIMIT, newLimit));
newLimit = Math.max(1, Math.min(maxLimit, newLimit));
this.currentLimit = newLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class PriorityLoadShedding {
@All
List<RequestClassifier<Object>> requestClassifiers;

private final boolean enabled;

private final int max;

private final OperatingSystemMXBean os;
Expand All @@ -31,13 +33,19 @@ public class PriorityLoadShedding {

private long lastCpuLoadTime;

PriorityLoadShedding() {
@Inject
PriorityLoadShedding(LoadSheddingRuntimeConfig config) {
enabled = config.priority().enabled();
max = RequestPriority.values().length * RequestClassifier.MAX_VALUE;
os = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
}

// when this is called, we know we're overloaded
public boolean shedLoad(Object request) {
if (!enabled) {
return true;
}

long now = System.currentTimeMillis();
if (now - lastCpuLoadTime > 5_000) {
lastCpuLoad = os.getCpuLoad();
Expand Down

0 comments on commit 6706485

Please sign in to comment.