diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 81afa4ffbfee5..b6d2876e98212 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -1887,6 +1887,16 @@ quarkus-smallrye-openapi-common-deployment ${project.version} + + io.quarkus + quarkus-load-shedding + ${project.version} + + + io.quarkus + quarkus-load-shedding-deployment + ${project.version} + io.quarkus quarkus-vertx diff --git a/devtools/bom-descriptor-json/pom.xml b/devtools/bom-descriptor-json/pom.xml index ffc75fd522023..260b88d194a5f 100644 --- a/devtools/bom-descriptor-json/pom.xml +++ b/devtools/bom-descriptor-json/pom.xml @@ -1344,6 +1344,19 @@ + + io.quarkus + quarkus-load-shedding + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-logging-gelf diff --git a/docs/pom.xml b/docs/pom.xml index 6ee804140ed8e..9f7fbb3430f5c 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -1356,6 +1356,19 @@ + + io.quarkus + quarkus-load-shedding-deployment + ${project.version} + pom + test + + + * + * + + + io.quarkus quarkus-logging-gelf-deployment diff --git a/docs/src/main/asciidoc/load-shedding-reference.adoc b/docs/src/main/asciidoc/load-shedding-reference.adoc new file mode 100644 index 0000000000000..f5ac732d11690 --- /dev/null +++ b/docs/src/main/asciidoc/load-shedding-reference.adoc @@ -0,0 +1,150 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc +//// += Load Shedding reference guide +include::_attributes.adoc[] +:numbered: +:sectnums: +:categories: web +:topics: web,load-shedding +:extensions: io.quarkus:quarkus-load-shedding +:extension-status: experimental + +include::{includes}/extension-status.adoc[] + +Load shedding is the practice of detecting service overload and rejecting requests. + +In Quarkus, the `quarkus-load-shedding` extension provides a load shedding mechanism. + +== Use the Load Shedding extension + +To use the load shedding extension, you need to add the `io.quarkus:quarkus-load-shedding` extension to your project: + +[source,xml,role="primary asciidoc-tabs-target-sync-cli asciidoc-tabs-target-sync-maven"] +.pom.xml +---- + + io.quarkus + quarkus-load-shedding + +---- + +[source,gradle,role="secondary asciidoc-tabs-target-sync-gradle"] +.build.gradle +---- +implementation("io.quarkus:quarkus-load-shedding") +---- + +No configuration is required, though the possible configuration options are described below. + +== The load shedding algorithm + +The load shedding algorithm has 2 parts: + +* overload detection +* priority load shedding (optional) + +=== Overload detection + +To detect whether the current service is overloaded, an adaptation of TCP Vegas is used. + +The algorithm starts with 100 allowed concurrent requests. +For each request, it compares the number of current requests with the allowed limit and if the limit is exceeded, an overload situation is signalled. + +If the limit is not exceeded, or if priority load shedding determines that the request should not be rejected (see below), the request is allowed. +When it finishes, its duration is compared with the lowest duration seen so far to estimate a queue size. +If the queue size is lower than _alpha_, the current limit is increased, but only up to a given maximum, by default 1000. +If the queue size is greater than _beta_, the current limit is decreased. +Otherwise, the current limit is kept intact. + +Alpha and beta are computed by multiplying the configurable constants with a base 10 logarithm of the current limit. + +After some number of requests, which can be modified by configuring the _probe_ factor, the lowest duration seen is reset to the last seen duration of a request. + +=== Priority load shedding + +If an overload situation is signalled, priority load shedding is invoked. + +By default, priority load shedding is enabled, which means a request is only rejected if the current CPU load is high enough. +To determine whether a request should be rejected, 2 attributes are considered: + +* request priority +* request cohort + +There are 5 statically defined priorities and 128 cohorts, which amounts to 640 request groups in total. + +After both priority and cohort are assigned to a request, a request group number is computed: `group = priority * num_cohorts + cohort`. +Then, the group number is compared to a simple cubic function of current CPU load, where `load` is a number between 0 and 1: `num_groups * (1 - load^3)`. +If the group number is higher, the request is rejected, otherwise it is allowed even in an overload situation. + +If priority load shedding is disabled, all requests are rejected in an overload situation. + +==== Customizing request priority + +Priority is assigned by a `io.quarkus.load.shedding.RequestPrioritizer`. +There is 5 statically defined priorities in the `io.quarkus.load.shedding.RequestPriority` enum: `CRITICAL`, `IMPORTANT`, `NORMAL`, `BACKGROUND` and `DEGRADED`. +By default, if no request prioritizer applies, the priority is assumed to be `NORMAL`. + +There is one default prioritizer which assigns the priority of `CRITICAL` to requests to the non-application endpoints. +It declares no `@Priority`. + +It is possible to define custom implementations of the `RequestPrioritizer` interface. +The implementations must be CDI beans, otherwise they are ignored. +The CDI rules of typesafe resolution must be followed. +That is, if multiple implementations exist with a different `@Priority` value and some of them are ``@Alternative``s, only the alternatives with the highest priority value are retained. +If no implementation is an alternative, all implementations are retained and are sorted in descending `@Priority` order (highest priority value comes first). + +==== Customizing request cohort + +Cohort is assigned by a `io.quarkus.load.shedding.RequestClassifier`. +There is 128 statically defined cohorts, with the lowest number being 1 and highest number being 128. +The classifier should return a number in this interval; if it does not, the number is adjusted automatically. + +There is one default classifier which assigns a cohort based on a hash of the remote IP address and current time, such that an IP address changes its cohort roughly every hour. +It declares no `@Priority`. + +It is possible to define custom implementations of the `RequestClassifier` interface. +The implementations must be CDI beans, otherwise they are ignored. +The CDI rules of typesafe resolution must be followed. +That is, if multiple implementations exist with a different `@Priority` value and some of them are ``@Alternative``s, only the alternatives with the highest priority value are retained. +If no implementation is an alternative, all implementations are retained and are sorted in descending `@Priority` order (highest priority value comes first). + +== Limitations + +The load shedding extension currently only applies to HTTP requests, and is heavily skewed towards request/response network interactions. +This means that gRPC, WebSocket and other kinds of streaming over HTTP are not supported. +Other "entrypoints" to Quarkus applications, such as messaging, are not supported either. + +Further, the load shedding implementation is currently rather basic and not heavily tested in production. +Improvements may be necessary. + +== Configuration reference + +include::{generated-dir}/config/quarkus-load-shedding.adoc[opts=optional, leveloffset=+1] + +== Further reading + +Netflix Technology Blog: + +* https://netflixtechblog.medium.com/performance-under-load-3e6fa9a60581[Performance Under Load] +* https://netflixtechblog.com/keeping-netflix-reliable-using-prioritized-load-shedding-6cc827b02f94[Keeping Netflix Reliable Using Prioritized Load Shedding] + +Uber Engineering Blog: + +* https://www.uber.com/blog/cinnamon-using-century-old-tech-to-build-a-mean-load-shedder/[Cinnamon: Using Century Old Tech to Build a Mean Load Shedder] +* https://www.uber.com/blog/pid-controller-for-cinnamon/[PID Controller for Cinnamon] +* https://www.uber.com/blog/cinnamon-auto-tuner-adaptive-concurrency-in-the-wild/[Cinnamon Auto-Tuner: Adaptive Concurrency in the Wild] + +Amazon Builders' Library: + +* https://aws.amazon.com/builders-library/using-load-shedding-to-avoid-overload/[Using load shedding to avoid overload] + +Google Cloud Blog: + +* https://cloud.google.com/blog/products/gcp/using-load-shedding-to-survive-a-success-disaster-cre-life-lessons[Using load shedding to survive a success disaster] + +CodeReliant Blog: + +* https://www.codereliant.io/load-shedding/[Load Shedding for High Traffic Systems] diff --git a/extensions/load-shedding/deployment/pom.xml b/extensions/load-shedding/deployment/pom.xml new file mode 100644 index 0000000000000..782ca14cbcfab --- /dev/null +++ b/extensions/load-shedding/deployment/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + + io.quarkus + quarkus-load-shedding-parent + 999-SNAPSHOT + + + quarkus-load-shedding-deployment + + Quarkus - Load Shedding - Deployment + + + + io.quarkus + quarkus-load-shedding + + + + io.quarkus + quarkus-vertx-http-deployment + + + io.quarkus + quarkus-rest-deployment + test + + + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + + + org.assertj + assertj-core + test + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/load-shedding/deployment/src/main/java/io/quarkus/load/shedding/deployment/LoadSheddingProcessor.java b/extensions/load-shedding/deployment/src/main/java/io/quarkus/load/shedding/deployment/LoadSheddingProcessor.java new file mode 100644 index 0000000000000..669bf6c4c252f --- /dev/null +++ b/extensions/load-shedding/deployment/src/main/java/io/quarkus/load/shedding/deployment/LoadSheddingProcessor.java @@ -0,0 +1,34 @@ +package io.quarkus.load.shedding.deployment; + +import java.util.ArrayList; +import java.util.List; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.load.shedding.runtime.HttpLoadShedding; +import io.quarkus.load.shedding.runtime.HttpRequestClassifier; +import io.quarkus.load.shedding.runtime.ManagementRequestPrioritizer; +import io.quarkus.load.shedding.runtime.OverloadDetector; +import io.quarkus.load.shedding.runtime.PriorityLoadShedding; + +public class LoadSheddingProcessor { + private static final String FEATURE = "load-shedding"; + + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(FEATURE); + } + + @BuildStep + AdditionalBeanBuildItem beans() { + List beans = new ArrayList<>(); + beans.add(OverloadDetector.class.getName()); + beans.add(HttpLoadShedding.class.getName()); + beans.add(PriorityLoadShedding.class.getName()); + beans.add(ManagementRequestPrioritizer.class.getName()); + beans.add(HttpRequestClassifier.class.getName()); + + return AdditionalBeanBuildItem.builder().addBeanClasses(beans).build(); + } +} diff --git a/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/NaiveLoadSheddingTest.java b/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/NaiveLoadSheddingTest.java new file mode 100644 index 0000000000000..4ac7ae3c1cc17 --- /dev/null +++ b/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/NaiveLoadSheddingTest.java @@ -0,0 +1,65 @@ +package io.quarkus.load.shedding; + +import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; + +public class NaiveLoadSheddingTest { + private static final int NUM_THREADS = 20; + private static final int NUM_REQUESTS = 10; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot(jar -> jar.addClasses(MyResource.class)) + .overrideConfigKey("quarkus.load-shedding.initial-limit", "5") + .overrideConfigKey("quarkus.load-shedding.max-limit", "10") + .overrideConfigKey("quarkus.load-shedding.priority.enabled", "false"); + + @Test + public void test() throws InterruptedException { + AtomicInteger numErrors = new AtomicInteger(); + CountDownLatch begin = new CountDownLatch(1); + CountDownLatch end = new CountDownLatch(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + new Thread(() -> { + try { + begin.await(); + for (int j = 0; j < NUM_REQUESTS; j++) { + int statusCode = when().get("/").then().extract().statusCode(); + if (statusCode == 503) { + numErrors.incrementAndGet(); + } + } + end.countDown(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + } + + begin.countDown(); + end.await(); + + // at least 1/2 of all requests failed + assertThat(numErrors).hasValueGreaterThanOrEqualTo(100); + } + + @Path("/") + public static class MyResource { + @GET + public String hello() throws InterruptedException { + Thread.sleep(100); + return "Hello, world!"; + } + } +} diff --git a/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/TimeBasedRequestClassifierTest.java b/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/TimeBasedRequestClassifierTest.java new file mode 100644 index 0000000000000..9fdac83523fc3 --- /dev/null +++ b/extensions/load-shedding/deployment/src/test/java/io/quarkus/load/shedding/TimeBasedRequestClassifierTest.java @@ -0,0 +1,41 @@ +package io.quarkus.load.shedding; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import org.junit.jupiter.api.Test; + +public class TimeBasedRequestClassifierTest { + @Test + public void fixedTime() { + fixedTime(0); + fixedTime(1_000); + fixedTime(500_000); + } + + private void fixedTime(long now) { + int hour = (int) (now >> 22); + + int nextHour = (int) ((now + 1_000) >> 22); + assertEquals(hour, nextHour); + + nextHour = (int) ((now + 1_000_000) >> 22); + assertEquals(hour, nextHour); + + nextHour = (int) ((now + 3_600_000) >> 22); + assertEquals(hour, nextHour); + + // 4_200_000 because 2^22 = 4_194_304 + nextHour = (int) ((now + 4_200_000) >> 22); + assertNotEquals(hour, nextHour); + } + + @Test + public void currentTime() { + long now = System.currentTimeMillis(); + int hour = (int) (now >> 22); + + int nextHour = (int) ((now + 4_200_000) >> 22); + assertNotEquals(hour, nextHour); + } +} diff --git a/extensions/load-shedding/pom.xml b/extensions/load-shedding/pom.xml new file mode 100644 index 0000000000000..1116e835d64b1 --- /dev/null +++ b/extensions/load-shedding/pom.xml @@ -0,0 +1,24 @@ + + + 4.0.0 + + + quarkus-extensions-parent + io.quarkus + 999-SNAPSHOT + ../pom.xml + + + quarkus-load-shedding-parent + pom + + Quarkus - Load Shedding + + + deployment + runtime + + + diff --git a/extensions/load-shedding/runtime/pom.xml b/extensions/load-shedding/runtime/pom.xml new file mode 100644 index 0000000000000..3636bd25a1b54 --- /dev/null +++ b/extensions/load-shedding/runtime/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + + + io.quarkus + quarkus-load-shedding-parent + 999-SNAPSHOT + + + quarkus-load-shedding + + Quarkus - Load Shedding - Runtime + Shed excess load and keep your service available + + + + io.quarkus + quarkus-vertx-http + + + + + + + io.quarkus + quarkus-extension-maven-plugin + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestClassifier.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestClassifier.java new file mode 100644 index 0000000000000..d62b537d3f942 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestClassifier.java @@ -0,0 +1,36 @@ +package io.quarkus.load.shedding; + +/** + * Assigns a cohort number to a request. There is 128 statically defined cohorts, + * where the minimum cohort number is 1 and maximum is 128, inclusive. All classifiers + * are inspected and the first one that returns {@code true} for {@link #appliesTo(Object)} + * is taken. + *

+ * An implementation must be a CDI bean, otherwise it is ignored. CDI typesafe resolution + * rules must be followed. That is, if multiple implementations are provided with different + * {@link jakarta.annotation.Priority} values, only the implementations with the highest + * priority are retained. + * + * @param type of the request + */ +public interface RequestClassifier { + int MIN_COHORT = 1; + + int MAX_COHORT = 128; + + /** + * Returns whether this request classifier applies to given {@code request}. + * + * @param request the request, never {@code null} + * @return whether this request classifier applies to given {@code request} + */ + boolean appliesTo(Object request); + + /** + * Returns the cohort to which the given {@code request} belongs. + * + * @param request the request, never {@code null} + * @return the cohort to which the given {@code request} belongs + */ + int cohort(R request); +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPrioritizer.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPrioritizer.java new file mode 100644 index 0000000000000..606dd8ef52149 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPrioritizer.java @@ -0,0 +1,23 @@ +package io.quarkus.load.shedding; + +/** + * Assigns a {@linkplain RequestPriority priority} to a request. All prioritizers + * are inspected and the first one that returns {@code true} for {@link #appliesTo(Object)} + * is taken. + *

+ * If no prioritizer applies to a given request, the priority of {@link RequestPriority#NORMAL} + * is assumed. By default, a prioritizer for non-application endpoints is present, which + * assigns them the {@link RequestPriority#CRITICAL} priority. + *

+ * An implementation must be a CDI bean, otherwise it is ignored. CDI typesafe resolution + * rules must be followed. That is, if multiple implementations are provided with different + * {@link jakarta.annotation.Priority} values, only the implementations with the highest + * priority are retained. + * + * @param type of the request + */ +public interface RequestPrioritizer { + boolean appliesTo(Object request); + + RequestPriority priority(R request); +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPriority.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPriority.java new file mode 100644 index 0000000000000..959d0d025d28a --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/RequestPriority.java @@ -0,0 +1,33 @@ +package io.quarkus.load.shedding; + +/** + * A priority that can be assigned to a request by implementing the {@link RequestPrioritizer}. + * There is 5 statically defined priority levels: + *

+ * + * @see RequestPrioritizer + */ +public enum RequestPriority { + CRITICAL(0), + IMPORTANT(1), + NORMAL(2), + BACKGROUND(3), + DEGRADED(4), + ; + + private final int cohortBaseline; + + RequestPriority(int factor) { + this.cohortBaseline = factor * RequestClassifier.MAX_COHORT; + } + + public int cohortBaseline() { + return cohortBaseline; + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpLoadShedding.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpLoadShedding.java new file mode 100644 index 0000000000000..6f17b3ef9ca85 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpLoadShedding.java @@ -0,0 +1,49 @@ +package io.quarkus.load.shedding.runtime; + +import jakarta.annotation.Priority; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Singleton; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.Router; + +@Singleton +public class HttpLoadShedding { + public void init(@Observes @Priority(-1_000_000_000) Router router, OverloadDetector detector, + PriorityLoadShedding priority, LoadSheddingRuntimeConfig config) { + + if (!config.enabled()) { + return; + } + + router.route().order(-1_000_000_000).handler(ctx -> { + if (detector.isOverloaded() && priority.shedLoad(ctx.request())) { + HttpServerResponse response = ctx.response(); + response.setStatusCode(HttpResponseStatus.SERVICE_UNAVAILABLE.code()); + response.headers().add(HttpHeaderNames.CONNECTION, "close"); + response.endHandler(new Handler() { + @Override + public void handle(Void ignored) { + ctx.request().connection().close(); + } + }); + response.end(); + } else { + detector.requestBegin(); + long start = System.nanoTime(); + ctx.addEndHandler(new Handler>() { + @Override + public void handle(AsyncResult ignored) { + long end = System.nanoTime(); + detector.requestEnd((end - start) / 1_000); + } + }); + ctx.next(); + } + }); + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpRequestClassifier.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpRequestClassifier.java new file mode 100644 index 0000000000000..bd8cfd08d494e --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/HttpRequestClassifier.java @@ -0,0 +1,24 @@ +package io.quarkus.load.shedding.runtime; + +import jakarta.inject.Singleton; + +import io.quarkus.load.shedding.RequestClassifier; +import io.vertx.core.http.HttpServerRequest; + +@Singleton +public class HttpRequestClassifier implements RequestClassifier { + @Override + public boolean appliesTo(Object request) { + return request instanceof HttpServerRequest; + } + + @Override + public int cohort(HttpServerRequest request) { + int hour = (int) (System.currentTimeMillis() >> 22); // roughly 1 hour + String host = request.remoteAddress().hostAddress(); + if (host == null) { + host = ""; + } + return hour + host.hashCode(); + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/LoadSheddingRuntimeConfig.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/LoadSheddingRuntimeConfig.java new file mode 100644 index 0000000000000..69f28ffbcca61 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/LoadSheddingRuntimeConfig.java @@ -0,0 +1,62 @@ +package io.quarkus.load.shedding.runtime; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; + +@ConfigMapping(prefix = "quarkus.load-shedding") +@ConfigRoot(phase = ConfigPhase.RUN_TIME) +public interface LoadSheddingRuntimeConfig { + /** + * Whether load shedding should be enabled. + * Currently, this only applies to incoming HTTP requests. + */ + @WithDefault("true") + boolean enabled(); + + /** + * The maximum number of concurrent requests allowed. + */ + @WithDefault("1000") + int maxLimit(); + + /** + * The {@code alpha} factor of the Vegas overload detection algorithm. + */ + @WithDefault("3") + int alphaFactor(); + + /** + * The {@code beta} factor of the Vegas overload detection algorithm. + */ + @WithDefault("6") + int betaFactor(); + + /** + * The probe factor of the Vegas overload detection algorithm. + */ + @WithDefault("30.0") + double probeFactor(); + + /** + * The initial limit of concurrent requests allowed. + */ + @WithDefault("100") + int initialLimit(); + + /** + * Configuration of priority load shedding. + */ + PriorityLoadShedding priority(); + + @ConfigGroup + interface PriorityLoadShedding { + /** + * Whether priority load shedding should be enabled. + */ + @WithDefault("true") + boolean enabled(); + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/ManagementRequestPrioritizer.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/ManagementRequestPrioritizer.java new file mode 100644 index 0000000000000..ee011ccd35f31 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/ManagementRequestPrioritizer.java @@ -0,0 +1,46 @@ +package io.quarkus.load.shedding.runtime; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import io.quarkus.load.shedding.RequestPrioritizer; +import io.quarkus.load.shedding.RequestPriority; +import io.quarkus.vertx.http.runtime.HttpBuildTimeConfig; +import io.quarkus.vertx.http.runtime.management.ManagementInterfaceBuildTimeConfig; +import io.vertx.core.http.HttpServerRequest; + +@Singleton +public class ManagementRequestPrioritizer implements RequestPrioritizer { + private final String managementPath; + + @Inject + public ManagementRequestPrioritizer(HttpBuildTimeConfig httpConfig, + ManagementInterfaceBuildTimeConfig managementInterfaceConfig) { + if (managementInterfaceConfig.enabled) { + managementPath = null; + return; + } + if (httpConfig.nonApplicationRootPath.startsWith("/")) { + if (httpConfig.nonApplicationRootPath.equals(httpConfig.rootPath)) { + managementPath = null; + return; + } + managementPath = httpConfig.nonApplicationRootPath; + return; + } + managementPath = httpConfig.rootPath + httpConfig.nonApplicationRootPath; + } + + @Override + public boolean appliesTo(Object request) { + if (managementPath != null && request instanceof HttpServerRequest httpRequest) { + return httpRequest.path().startsWith(managementPath); + } + return false; + } + + @Override + public RequestPriority priority(HttpServerRequest request) { + return RequestPriority.CRITICAL; + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/OverloadDetector.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/OverloadDetector.java new file mode 100644 index 0000000000000..12224e47d24b1 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/OverloadDetector.java @@ -0,0 +1,109 @@ +package io.quarkus.load.shedding.runtime; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +/** + * An overload detector based on TCP Vegas, as implemented by + * Netflix Concurrency Limits. + */ +@Singleton +public class OverloadDetector { + private static final int[] LOG10_PLUS_1_TABLE = new int[1_000]; + + static { + LOG10_PLUS_1_TABLE[0] = 1; + for (int i = 1; i < 1_000; i++) { + LOG10_PLUS_1_TABLE[i] = 1 + (int) Math.log10(i); + } + } + + private final int maxLimit; + private final int alphaFactor; + private final int betaFactor; + private final double probeFactor; + + private final AtomicInteger currentRequests = new AtomicInteger(); + private volatile long currentLimit; + + private long lowestRequestTime = Long.MAX_VALUE; + private double probeCount = 0.0; + private double probeJitter; + + @Inject + public OverloadDetector(LoadSheddingRuntimeConfig config) { + maxLimit = config.maxLimit(); + alphaFactor = config.alphaFactor(); + betaFactor = config.betaFactor(); + probeFactor = config.probeFactor(); + currentLimit = config.initialLimit(); + resetProbeJitter(); + } + + public boolean isOverloaded() { + return currentRequests.get() >= currentLimit; + } + + public void requestBegin() { + currentRequests.incrementAndGet(); + } + + public void requestEnd(long timeInMicros) { + int current = currentRequests.getAndDecrement(); + + update(timeInMicros, current); + } + + private synchronized void update(long requestTime, int currentRequests) { + probeCount++; + if (probeFactor * probeJitter * currentLimit <= probeCount) { + resetProbeJitter(); + probeCount = 0.0; + lowestRequestTime = requestTime; + return; + } + + if (requestTime < lowestRequestTime) { + lowestRequestTime = requestTime; + return; + } + + long currentLimit = this.currentLimit; + + if (2L * currentRequests < currentLimit) { + return; + } + + int queueSize = (int) Math.ceil(currentLimit * (1.0 - (double) lowestRequestTime / (double) requestTime)); + + int currentLimitLog10Plus1; + if (currentLimit >= 0 && currentLimit < 1_000) { + currentLimitLog10Plus1 = LOG10_PLUS_1_TABLE[(int) currentLimit]; + } else { + currentLimitLog10Plus1 = 1 + (int) Math.log10(currentLimit); + } + int alpha = alphaFactor * currentLimitLog10Plus1; + int beta = betaFactor * currentLimitLog10Plus1; + + long newLimit; + if (queueSize <= currentLimitLog10Plus1) { + newLimit = currentLimit + beta; + } else if (queueSize < alpha) { + newLimit = currentLimit + currentLimitLog10Plus1; + } else if (queueSize > beta) { + newLimit = currentLimit - currentLimitLog10Plus1; + } else { + return; + } + + newLimit = Math.max(1, Math.min(maxLimit, newLimit)); + this.currentLimit = newLimit; + } + + private void resetProbeJitter() { + probeJitter = ThreadLocalRandom.current().nextDouble(0.5, 1); + } +} diff --git a/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/PriorityLoadShedding.java b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/PriorityLoadShedding.java new file mode 100644 index 0000000000000..e5a26046f2d5f --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/java/io/quarkus/load/shedding/runtime/PriorityLoadShedding.java @@ -0,0 +1,93 @@ +package io.quarkus.load.shedding.runtime; + +import java.lang.management.ManagementFactory; +import java.util.List; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import com.sun.management.OperatingSystemMXBean; + +import io.quarkus.arc.All; +import io.quarkus.load.shedding.RequestClassifier; +import io.quarkus.load.shedding.RequestPrioritizer; +import io.quarkus.load.shedding.RequestPriority; + +@Singleton +public class PriorityLoadShedding { + @Inject + @All + List> requestPrioritizers; + + @Inject + @All + List> requestClassifiers; + + private final boolean enabled; + + private final int max; + + private final OperatingSystemMXBean os; + + private double lastThreshold; + + private long lastThresholdTime; + + @Inject + PriorityLoadShedding(LoadSheddingRuntimeConfig config) { + enabled = config.priority().enabled(); + max = RequestPriority.values().length * RequestClassifier.MAX_COHORT; + os = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + } + + // when this is called, we know we're overloaded + public boolean shedLoad(Object request) { + if (!enabled) { + return true; + } + + long now = System.currentTimeMillis(); + synchronized (this) { + if (now - lastThresholdTime > 1_000) { + double load = os.getCpuLoad(); + if (load < 0) { + lastThreshold = -1; + } else { + lastThreshold = max * (1.0 - load * load * load); + } + lastThresholdTime = now; + } + } + double threshold = lastThreshold; + if (threshold < 0) { + return true; + } + + RequestPriority priority = RequestPriority.NORMAL; + for (RequestPrioritizer requestPrioritizer : requestPrioritizers) { + if (requestPrioritizer.appliesTo(request)) { + priority = requestPrioritizer.priority(request); + break; + } + } + + int cohort = 64; // in the middle of the [1,128] interval + for (RequestClassifier requestClassifier : requestClassifiers) { + if (requestClassifier.appliesTo(request)) { + cohort = requestClassifier.cohort(request); + break; + } + } + if (cohort == Integer.MIN_VALUE) { + cohort = RequestClassifier.MAX_COHORT; + } else if (cohort < 0) { + cohort = (-cohort) % RequestClassifier.MAX_COHORT + 1; + } else if (cohort == 0) { + cohort = RequestClassifier.MIN_COHORT; + } else if (cohort > RequestClassifier.MAX_COHORT) { + cohort = cohort % RequestClassifier.MAX_COHORT + 1; + } + + return priority.cohortBaseline() + cohort > threshold; + } +} diff --git a/extensions/load-shedding/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/load-shedding/runtime/src/main/resources/META-INF/quarkus-extension.yaml new file mode 100644 index 0000000000000..c46bb4d74fca4 --- /dev/null +++ b/extensions/load-shedding/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -0,0 +1,13 @@ +--- +artifact: ${project.groupId}:${project.artifactId}:${project.version} +name: "Load Shedding" +metadata: + keywords: + - "fault-tolerance" + - "load-shedding" + guide: "https://quarkus.io/guides/load-shedding-reference" + categories: + - "cloud" + status: "experimental" + config: + - "quarkus.load-shedding." diff --git a/extensions/pom.xml b/extensions/pom.xml index e346c10ba5479..b918a94a3acab 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -55,6 +55,7 @@ info observability-devservices jfr + load-shedding resteasy-classic diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/RouteConstants.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/RouteConstants.java index 6d00a3afa9b07..e1030bacc24ac 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/RouteConstants.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/RouteConstants.java @@ -1,7 +1,7 @@ package io.quarkus.vertx.http.runtime; /** - * Route order value constants used in Quarkus, update {@code reactive-routes.adoc} when changing this class. + * Route order value constants used in Quarkus, update {@code http-reference.adoc} when changing this class. */ @SuppressWarnings("JavadocDeclaration") public final class RouteConstants {