From 71c8121c94814b61747e0ddc28b421f2fed9e95f Mon Sep 17 00:00:00 2001 From: Hitesh Khamesra Date: Wed, 30 Oct 2024 11:55:59 -0700 Subject: [PATCH] Added Prioritybased rate limiter (#235) * Added initial implementation of priority based rate limiter * Updated timeout and test * tidy --- .../apache/solr/core/RateLimiterConfig.java | 15 +- .../servlet/PriorityBasedRateLimiter.java | 132 +++++++++++++ .../apache/solr/servlet/QueryRateLimiter.java | 60 +----- .../apache/solr/servlet/RateLimitManager.java | 85 +++++++- .../solr/servlet/RequestRateLimiter.java | 5 +- .../solr/servlet/TestRequestRateLimiter.java | 184 +++++++++++++++++- .../apache/solr/client/solrj/SolrRequest.java | 3 +- .../request/beans/RateLimiterPayload.java | 10 +- 8 files changed, 412 insertions(+), 82 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index d08958a0348..d90a754f426 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -32,6 +32,7 @@ public class RateLimiterConfig { public final int allowedRequests; public final boolean isSlotBorrowingEnabled; public final int guaranteedSlotsThreshold; + public final boolean priorityBasedEnabled; /** * We store the config definition in order to determine whether anything has changed that would @@ -49,7 +50,8 @@ public RateLimiterConfig( int guaranteedSlotsThreshold, long waitForSlotAcquisition, int allowedRequests, - boolean isSlotBorrowingEnabled) { + boolean isSlotBorrowingEnabled, + boolean priorityBasedEnabled) { this( requestType, makePayload( @@ -57,7 +59,8 @@ public RateLimiterConfig( guaranteedSlotsThreshold, waitForSlotAcquisition, allowedRequests, - isSlotBorrowingEnabled)); + isSlotBorrowingEnabled, + priorityBasedEnabled)); } private static RateLimiterPayload makePayload( @@ -65,13 +68,15 @@ private static RateLimiterPayload makePayload( int guaranteedSlotsThreshold, long waitForSlotAcquisition, int allowedRequests, - boolean isSlotBorrowingEnabled) { + boolean isSlotBorrowingEnabled, + boolean priorityBasedEnabled) { RateLimiterPayload ret = new RateLimiterPayload(); ret.enabled = isEnabled; ret.allowedRequests = allowedRequests; ret.guaranteedSlots = guaranteedSlotsThreshold; ret.slotBorrowingEnabled = isSlotBorrowingEnabled; ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition); + ret.priorityBasedEnabled = priorityBasedEnabled; return ret; } @@ -98,6 +103,9 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay ? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS : definition.slotAcquisitionTimeoutInMS.longValue(); + priorityBasedEnabled = + definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled; + this.definition = definition; } @@ -125,6 +133,7 @@ public String toString() { sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold); sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled); sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition); + sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled); return sb.append('}').toString(); } } diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java new file mode 100644 index 00000000000..b4d17afaae5 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -0,0 +1,132 @@ +package org.apache.solr.servlet; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.http.HttpServletRequest; +import org.apache.solr.common.SolrException; +import org.apache.solr.core.RateLimiterConfig; + +/** + * PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two + * priorities FOREGROUND and BACKGROUND Requests. Client can pass the {@link + * org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate + * the foreground and background request. Foreground requests has high priority than background + * requests + */ +public class PriorityBasedRateLimiter extends RequestRateLimiter { + public static final String SOLR_REQUEST_PRIORITY_PARAM = "Solr-Request-Priority"; + private final AtomicInteger activeRequests = new AtomicInteger(); + private final Semaphore numRequestsAllowed; + + private final int totalAllowedRequests; + + private final LinkedBlockingQueue waitingList = new LinkedBlockingQueue<>(); + + private final long waitTimeoutInNanos; + + public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { + super(rateLimiterConfig); + this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true); + this.totalAllowedRequests = rateLimiterConfig.allowedRequests; + this.waitTimeoutInNanos = rateLimiterConfig.waitForSlotAcquisition * 1000000l; + } + + @Override + public SlotReservation handleRequest(HttpServletRequest request) { + if (!rateLimiterConfig.isEnabled) { + return UNLIMITED; + } + RequestPriorities requestPriority = getRequestPriority(request); + if (requestPriority == null) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Request priority header is not defined or not set properly"); + } + try { + if (!acquire(requestPriority)) { + return null; + } + } catch (InterruptedException ie) { + return null; + } + return () -> PriorityBasedRateLimiter.this.release(); + } + + private boolean acquire(RequestPriorities priority) throws InterruptedException { + if (priority.equals(RequestPriorities.FOREGROUND)) { + return nextInQueue(this.waitTimeoutInNanos); + } else if (priority.equals(RequestPriorities.BACKGROUND)) { + if (this.activeRequests.get() < this.totalAllowedRequests) { + return nextInQueue(this.waitTimeoutInNanos); + } else { + CountDownLatch wait = new CountDownLatch(1); + this.waitingList.put(wait); + long startTime = System.nanoTime(); + if (wait.await(this.waitTimeoutInNanos, TimeUnit.NANOSECONDS)) { + return nextInQueue(this.waitTimeoutInNanos - (System.nanoTime() - startTime)); + } else { + // remove from the queue; this/other requests already waited long enough; thus best effort + this.waitingList.poll(); + return false; + } + } + } + return true; + } + + private boolean nextInQueue(long waitTimeoutInNanos) throws InterruptedException { + this.activeRequests.addAndGet(1); + boolean acquired = + this.numRequestsAllowed.tryAcquire(1, waitTimeoutInNanos, TimeUnit.NANOSECONDS); + if (!acquired) { + this.activeRequests.addAndGet(-1); + return false; + } + return true; + } + + private void exitFromQueue() { + this.numRequestsAllowed.release(1); + this.activeRequests.addAndGet(-1); + } + + private void release() { + this.exitFromQueue(); + if (this.activeRequests.get() < this.totalAllowedRequests) { + // next priority + CountDownLatch waiter = this.waitingList.poll(); + if (waiter != null) { + waiter.countDown(); + } + } + } + + @Override + public SlotReservation allowSlotBorrowing() throws InterruptedException { + // if we reach here that means slot is not available + return null; + } + + public int getRequestsAllowed() { + return this.activeRequests.get(); + } + + private RequestPriorities getRequestPriority(HttpServletRequest request) { + String requestPriority = request.getHeader(SOLR_REQUEST_PRIORITY_PARAM); + try { + return RequestPriorities.valueOf(requestPriority); + } catch (IllegalArgumentException iae) { + } + return null; + } + + public enum RequestPriorities { + // this has high priority + FOREGROUND, + // this has low priority + BACKGROUND + } +} diff --git a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java index dae744c0df4..11e5c10692f 100644 --- a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java @@ -17,20 +17,12 @@ package org.apache.solr.servlet; -import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; - import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.Map; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; import org.apache.solr.util.SolrJacksonAnnotationInspector; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; /** * Implementation of RequestRateLimiter specific to query request types. Most of the actual work is @@ -39,26 +31,17 @@ public class QueryRateLimiter extends RequestRateLimiter { private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); - public QueryRateLimiter(SolrZkClient solrZkClient) { - super(constructQueryRateLimiterConfig(solrZkClient)); - } - public QueryRateLimiter(RateLimiterConfig config) { super(config); } public static RateLimiterConfig processConfigChange( - SolrRequest.SolrRequestType requestType, - RateLimiterConfig rateLimiterConfig, - Map properties) - throws IOException { - byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY)); + RateLimiterConfig rateLimiterConfig, RateLimiterPayload rateLimiterMeta) throws IOException { - RateLimiterPayload rateLimiterMeta; - if (configInput == null || configInput.length == 0) { - rateLimiterMeta = null; - } else { - rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); + // default rate limiter + SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY; + if (rateLimiterMeta.priorityBasedEnabled) { + requestType = SolrRequest.SolrRequestType.PRIORITY_BASED; } if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) { @@ -68,37 +51,4 @@ public static RateLimiterConfig processConfigChange( return null; } } - - // To be used in initialization - @SuppressWarnings({"unchecked"}) - private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) { - try { - - if (zkClient == null) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } - - Map clusterPropsJson = - (Map) - Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)); - byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); - - if (configInput.length == 0) { - // No Rate Limiter configuration defined in clusterprops.json. Return default configuration - // values - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } - - RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); - - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta); - } catch (KeeperException.NoNodeException e) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } catch (KeeperException | InterruptedException e) { - throw new RuntimeException( - "Error reading cluster property", SolrZkClient.checkInterrupted(e)); - } catch (IOException e) { - throw new RuntimeException("Encountered an IOException " + e.getMessage()); - } - } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 5a02553ceaf..d339cafb5c3 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -19,7 +19,9 @@ import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_CONTEXT_PARAM; import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_TYPE_PARAM; +import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; @@ -28,9 +30,15 @@ import javax.servlet.http.HttpServletRequest; import net.jcip.annotations.ThreadSafe; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; import org.apache.solr.common.cloud.ClusterPropertiesListener; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; +import org.apache.solr.util.SolrJacksonAnnotationInspector; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +54,7 @@ @ThreadSafe public class RateLimitManager implements ClusterPropertiesListener { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); public static final String ERROR_MESSAGE = "Too many requests for this request type. Please try after some time or increase the quota for this request type"; public static final int DEFAULT_CONCURRENT_REQUESTS = @@ -61,20 +69,36 @@ public RateLimitManager() { @Override public boolean onChange(Map properties) { + byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY)); + + RateLimiterPayload rateLimiterMeta; + if (configInput == null || configInput.length == 0) { + return false; + } else { + try { + rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + // Hack: We only support query rate limiting for now requestRateLimiterMap.compute( - SolrRequest.SolrRequestType.QUERY.toString(), + rateLimiterMeta.priorityBasedEnabled + ? SolrRequest.SolrRequestType.PRIORITY_BASED.name() + : SolrRequest.SolrRequestType.QUERY.name(), (k, v) -> { try { RateLimiterConfig newConfig = QueryRateLimiter.processConfigChange( - SolrRequest.SolrRequestType.QUERY, - v == null ? null : v.getRateLimiterConfig(), - properties); + v == null ? null : v.getRateLimiterConfig(), rateLimiterMeta); if (newConfig == null) { return v; } else { log.info("updated config: {}", newConfig); + if (newConfig.priorityBasedEnabled) { + return new PriorityBasedRateLimiter(newConfig); + } return new QueryRateLimiter(newConfig); } } catch (IOException e) { @@ -115,7 +139,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque // slot borrowing should be fallback behavior, so if `slotAcquisitionTimeoutInMS` // is configured it will be applied here (blocking if necessary), to make a best // effort to draw from the request's own slot pool. - RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(); + RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(request); if (result != null) { return result; @@ -188,10 +212,55 @@ public Builder(SolrZkClient solrZkClient) { public RateLimitManager build() { RateLimitManager rateLimitManager = new RateLimitManager(); - rateLimitManager.registerRequestRateLimiter( - new QueryRateLimiter(solrZkClient), SolrRequest.SolrRequestType.QUERY); + RateLimiterConfig rateLimiterConfig = constructQueryRateLimiterConfig(solrZkClient); + + if (rateLimiterConfig.priorityBasedEnabled) { + rateLimitManager.registerRequestRateLimiter( + new PriorityBasedRateLimiter(rateLimiterConfig), + SolrRequest.SolrRequestType.PRIORITY_BASED); + } else { + rateLimitManager.registerRequestRateLimiter( + new QueryRateLimiter(rateLimiterConfig), SolrRequest.SolrRequestType.QUERY); + } return rateLimitManager; } + + // To be used in initialization + @SuppressWarnings({"unchecked"}) + private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) { + try { + + if (zkClient == null) { + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } + + Map clusterPropsJson = + (Map) + Utils.fromJSON( + zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)); + byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); + + if (configInput.length == 0) { + // No Rate Limiter configuration defined in clusterprops.json. Return default + // configuration + // values + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } + + RateLimiterPayload rateLimiterMeta = + mapper.readValue(configInput, RateLimiterPayload.class); + return rateLimiterMeta.priorityBasedEnabled + ? new RateLimiterConfig(SolrRequest.SolrRequestType.PRIORITY_BASED, rateLimiterMeta) + : new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta); + } catch (KeeperException.NoNodeException e) { + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException( + "Error reading cluster property", SolrZkClient.checkInterrupted(e)); + } catch (IOException e) { + throw new RuntimeException("Encountered an IOException " + e.getMessage()); + } + } } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java index 04ef0900ac5..c8a7d587882 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java @@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.http.HttpServletRequest; import net.jcip.annotations.ThreadSafe; import org.apache.solr.core.RateLimiterConfig; @@ -46,7 +47,7 @@ public class RequestRateLimiter { private final AtomicInteger nativeReservations; - private final RateLimiterConfig rateLimiterConfig; + protected final RateLimiterConfig rateLimiterConfig; public static final SlotReservation UNLIMITED = () -> { // no-op @@ -91,7 +92,7 @@ boolean isEmpty() { * Handles an incoming request. returns a metadata object representing the metadata for the * acquired slot, if acquired. If a slot is not acquired, returns a null metadata object. */ - public SlotReservation handleRequest() throws InterruptedException { + public SlotReservation handleRequest(HttpServletRequest request) throws InterruptedException { if (!rateLimiterConfig.isEnabled) { return UNLIMITED; diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index 84c3a81d125..e4cc923787d 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -19,6 +19,7 @@ import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_CONTEXT_PARAM; import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_TYPE_PARAM; +import static org.apache.solr.servlet.PriorityBasedRateLimiter.SOLR_REQUEST_PRIORITY_PARAM; import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -26,7 +27,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -81,7 +84,8 @@ public void testConcurrentQueries() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent here RateLimitManager.Builder builder = @@ -129,7 +133,8 @@ public void testSlotBorrowingAcquisitionTimeout() guaranteed, slotAcqTimeMillis, slotLimit /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); // set allowed/guaranteed to the same, and very low, to force it to mainly borrow. It would also // be theoretically possible to optimize a single-request-type config to bypass slot-borrowing // logic altogether, so configuring a second ratelimiter eliminates the possibility that at @@ -141,7 +146,8 @@ public void testSlotBorrowingAcquisitionTimeout() 1, slotAcqTimeMillis, 1 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); mgr.registerRequestRateLimiter( new RequestRateLimiter(queryConfig), SolrRequest.SolrRequestType.QUERY); mgr.registerRequestRateLimiter( @@ -283,10 +289,20 @@ private static class DummyRequest extends Request { private final String ctx; private final String type; + private final String priority; + public DummyRequest(String ctx, String type) { super(null, null); this.ctx = ctx; this.type = type; + this.priority = null; + } + + public DummyRequest(String ctx, String type, String priority) { + super(null, null); + this.ctx = ctx; + this.type = type; + this.priority = priority; } @Override @@ -296,6 +312,8 @@ public String getHeader(String name) { return ctx; case SOLR_REQUEST_TYPE_PARAM: return type; + case SOLR_REQUEST_PRIORITY_PARAM: + return priority; default: throw new IllegalArgumentException(); } @@ -319,7 +337,8 @@ public void testSlotBorrowing() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); RateLimiterConfig indexRateLimiterConfig = new RateLimiterConfig( SolrRequest.SolrRequestType.UPDATE, @@ -327,7 +346,8 @@ public void testSlotBorrowing() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent RateLimitManager.Builder builder = @@ -420,10 +440,10 @@ public MockRequestRateLimiter(RateLimiterConfig config) { } @Override - public SlotReservation handleRequest() throws InterruptedException { + public SlotReservation handleRequest(HttpServletRequest request) throws InterruptedException { incomingRequestCount.getAndIncrement(); - SlotReservation response = super.handleRequest(); + SlotReservation response = super.handleRequest(request); if (response != null) { acceptedNewRequestCount.getAndIncrement(); @@ -486,6 +506,7 @@ public RateLimitManager build() { @Test @SuppressWarnings("try") public void testAdjustingConfig() throws IOException, InterruptedException { + DummyRequest dr = new DummyRequest(null, SolrRequest.SolrRequestType.QUERY.toString()); Random r = random(); int maxAllowed = 32; int allowed = r.nextInt(maxAllowed) + 1; @@ -498,7 +519,8 @@ public void testAdjustingConfig() throws IOException, InterruptedException { guaranteed, 20, allowed /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); RequestRateLimiter limiter = new RequestRateLimiter(config); ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("tests"); try (Closeable c = () -> ExecutorUtil.shutdownAndAwaitTermination(exec)) { @@ -522,7 +544,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException { () -> { while (!finish.get()) { try (RequestRateLimiter.SlotReservation slotReservation = - limiterF.handleRequest()) { + limiterF.handleRequest(dr)) { if (slotReservation != null) { executed.increment(); int ct = outstanding.incrementAndGet(); @@ -601,9 +623,151 @@ public void testAdjustingConfig() throws IOException, InterruptedException { guaranteed, 20, allowed /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false); limiter = new RequestRateLimiter(config); } } } + + @Test + public void testPriorityBasedRateLimiter() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, + 5 /* allowedRequests */, + true /* isSlotBorrowing */, + true); + + PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); + + HttpServletRequest foreground = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); + + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(foreground)) { + assertNotNull(allowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + } + + HttpServletRequest background = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "BACKGROUND"); + + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(background)) { + assertNotNull(allowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + } + + HttpServletRequest unknown = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "unknown"); + + boolean gotException = false; + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(unknown)) { + assertNull(allowed); + } catch (SolrException se) { + gotException = true; + } + assertTrue(gotException); + } + + @Test + public void testPriorityBasedRateLimiterTimeout() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + 10, + 1 /* allowedRequests */, + true /* isSlotBorrowing */, + true); + + PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); + + HttpServletRequest firstRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); + + RequestRateLimiter.SlotReservation firstRequestAllowed = + rateLimitManager.handleRequest(firstRequest); + assertNotNull(firstRequestAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + HttpServletRequest secondRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); + + RequestRateLimiter.SlotReservation secondRequestNotAllowed = + rateLimitManager.handleRequest(secondRequest); + assertNull(secondRequestNotAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + HttpServletRequest thirdRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "BACKGROUND"); + + RequestRateLimiter.SlotReservation thirdRequestNotAllowed = + rateLimitManager.handleRequest(thirdRequest); + assertNull(thirdRequestNotAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + firstRequestAllowed.close(); + } + + @Test + public void testPriorityBasedRateLimiterDynamicChange() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + 10, + 1 /* allowedRequests */, + true /* isSlotBorrowing */, + false); + + QueryRateLimiter requestRateLimiter = new QueryRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.QUERY); + + RequestRateLimiter rateLimiter = + rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.PRIORITY_BASED); + assertNull(rateLimiter); + + Map properties = new HashMap<>(); + Map rateLimiterProps = new HashMap<>(); + rateLimiterProps.put("enabled", true); + rateLimiterProps.put("guaranteedSlots", 1); + rateLimiterProps.put("allowedRequests", 1); + rateLimiterProps.put("slotBorrowingEnabled", false); + rateLimiterProps.put("slotAcquisitionTimeoutInMS", 100); + rateLimiterProps.put("priorityBasedEnabled", true); + properties.put("rate-limiters", rateLimiterProps); + + // updating rate limiter + rateLimitManager.onChange(properties); + + rateLimiter = + rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.PRIORITY_BASED); + + assertEquals(true, rateLimiter.getRateLimiterConfig().priorityBasedEnabled); + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java index e00ad8376dd..cfd32af318c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java @@ -58,7 +58,8 @@ public enum SolrRequestType { SECURITY, ADMIN, STREAMING, - UNSPECIFIED + UNSPECIFIED, + PRIORITY_BASED, }; public enum SolrClientContext { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java index 07bae33de6b..6088cd00cee 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java @@ -33,6 +33,8 @@ public class RateLimiterPayload implements ReflectMapWriter { @JsonProperty public Integer slotAcquisitionTimeoutInMS; + @JsonProperty public Boolean priorityBasedEnabled; + public RateLimiterPayload copy() { RateLimiterPayload result = new RateLimiterPayload(); @@ -41,7 +43,7 @@ public RateLimiterPayload copy() { result.allowedRequests = allowedRequests; result.slotBorrowingEnabled = slotBorrowingEnabled; result.slotAcquisitionTimeoutInMS = slotAcquisitionTimeoutInMS; - + result.priorityBasedEnabled = priorityBasedEnabled; return result; } @@ -53,7 +55,8 @@ public boolean equals(Object obj) { && Objects.equals(this.guaranteedSlots, that.guaranteedSlots) && Objects.equals(this.allowedRequests, that.allowedRequests) && Objects.equals(this.slotBorrowingEnabled, that.slotBorrowingEnabled) - && Objects.equals(this.slotAcquisitionTimeoutInMS, that.slotAcquisitionTimeoutInMS); + && Objects.equals(this.slotAcquisitionTimeoutInMS, that.slotAcquisitionTimeoutInMS) + && Objects.equals(this.priorityBasedEnabled, that.priorityBasedEnabled); } return false; } @@ -65,6 +68,7 @@ public int hashCode() { guaranteedSlots, allowedRequests, slotBorrowingEnabled, - slotAcquisitionTimeoutInMS); + slotAcquisitionTimeoutInMS, + priorityBasedEnabled); } }