Skip to content

Commit

Permalink
Added Prioritybased rate limiter (#235)
Browse files Browse the repository at this point in the history
* Added initial implementation of priority based rate limiter

* Updated timeout and test

* tidy
  • Loading branch information
hiteshk25 authored Oct 30, 2024
1 parent 10837a3 commit 71c8121
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 82 deletions.
15 changes: 12 additions & 3 deletions solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class RateLimiterConfig {
public final int allowedRequests;
public final boolean isSlotBorrowingEnabled;
public final int guaranteedSlotsThreshold;
public final boolean priorityBasedEnabled;

/**
* We store the config definition in order to determine whether anything has changed that would
Expand All @@ -49,29 +50,33 @@ public RateLimiterConfig(
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
this(
requestType,
makePayload(
isEnabled,
guaranteedSlotsThreshold,
waitForSlotAcquisition,
allowedRequests,
isSlotBorrowingEnabled));
isSlotBorrowingEnabled,
priorityBasedEnabled));
}

private static RateLimiterPayload makePayload(
boolean isEnabled,
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
RateLimiterPayload ret = new RateLimiterPayload();
ret.enabled = isEnabled;
ret.allowedRequests = allowedRequests;
ret.guaranteedSlots = guaranteedSlotsThreshold;
ret.slotBorrowingEnabled = isSlotBorrowingEnabled;
ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition);
ret.priorityBasedEnabled = priorityBasedEnabled;
return ret;
}

Expand All @@ -98,6 +103,9 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay
? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS
: definition.slotAcquisitionTimeoutInMS.longValue();

priorityBasedEnabled =
definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled;

this.definition = definition;
}

Expand Down Expand Up @@ -125,6 +133,7 @@ public String toString() {
sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold);
sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled);
sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition);
sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled);
return sb.append('}').toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package org.apache.solr.servlet;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.RateLimiterConfig;

/**
* PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two
* priorities FOREGROUND and BACKGROUND Requests. Client can pass the {@link
* org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate
* the foreground and background request. Foreground requests has high priority than background
* requests
*/
public class PriorityBasedRateLimiter extends RequestRateLimiter {
public static final String SOLR_REQUEST_PRIORITY_PARAM = "Solr-Request-Priority";
private final AtomicInteger activeRequests = new AtomicInteger();
private final Semaphore numRequestsAllowed;

private final int totalAllowedRequests;

private final LinkedBlockingQueue<CountDownLatch> waitingList = new LinkedBlockingQueue<>();

private final long waitTimeoutInNanos;

public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);
this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true);
this.totalAllowedRequests = rateLimiterConfig.allowedRequests;
this.waitTimeoutInNanos = rateLimiterConfig.waitForSlotAcquisition * 1000000l;
}

@Override
public SlotReservation handleRequest(HttpServletRequest request) {
if (!rateLimiterConfig.isEnabled) {
return UNLIMITED;
}
RequestPriorities requestPriority = getRequestPriority(request);
if (requestPriority == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Request priority header is not defined or not set properly");
}
try {
if (!acquire(requestPriority)) {
return null;
}
} catch (InterruptedException ie) {
return null;
}
return () -> PriorityBasedRateLimiter.this.release();
}

private boolean acquire(RequestPriorities priority) throws InterruptedException {
if (priority.equals(RequestPriorities.FOREGROUND)) {
return nextInQueue(this.waitTimeoutInNanos);
} else if (priority.equals(RequestPriorities.BACKGROUND)) {
if (this.activeRequests.get() < this.totalAllowedRequests) {
return nextInQueue(this.waitTimeoutInNanos);
} else {
CountDownLatch wait = new CountDownLatch(1);
this.waitingList.put(wait);
long startTime = System.nanoTime();
if (wait.await(this.waitTimeoutInNanos, TimeUnit.NANOSECONDS)) {
return nextInQueue(this.waitTimeoutInNanos - (System.nanoTime() - startTime));
} else {
// remove from the queue; this/other requests already waited long enough; thus best effort
this.waitingList.poll();
return false;
}
}
}
return true;
}

private boolean nextInQueue(long waitTimeoutInNanos) throws InterruptedException {
this.activeRequests.addAndGet(1);
boolean acquired =
this.numRequestsAllowed.tryAcquire(1, waitTimeoutInNanos, TimeUnit.NANOSECONDS);
if (!acquired) {
this.activeRequests.addAndGet(-1);
return false;
}
return true;
}

private void exitFromQueue() {
this.numRequestsAllowed.release(1);
this.activeRequests.addAndGet(-1);
}

private void release() {
this.exitFromQueue();
if (this.activeRequests.get() < this.totalAllowedRequests) {
// next priority
CountDownLatch waiter = this.waitingList.poll();
if (waiter != null) {
waiter.countDown();
}
}
}

@Override
public SlotReservation allowSlotBorrowing() throws InterruptedException {
// if we reach here that means slot is not available
return null;
}

public int getRequestsAllowed() {
return this.activeRequests.get();
}

private RequestPriorities getRequestPriority(HttpServletRequest request) {
String requestPriority = request.getHeader(SOLR_REQUEST_PRIORITY_PARAM);
try {
return RequestPriorities.valueOf(requestPriority);
} catch (IllegalArgumentException iae) {
}
return null;
}

public enum RequestPriorities {
// this has high priority
FOREGROUND,
// this has low priority
BACKGROUND
}
}
60 changes: 5 additions & 55 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@

package org.apache.solr.servlet;

import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.apache.solr.util.SolrJacksonAnnotationInspector;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/**
* Implementation of RequestRateLimiter specific to query request types. Most of the actual work is
Expand All @@ -39,26 +31,17 @@
public class QueryRateLimiter extends RequestRateLimiter {
private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();

public QueryRateLimiter(SolrZkClient solrZkClient) {
super(constructQueryRateLimiterConfig(solrZkClient));
}

public QueryRateLimiter(RateLimiterConfig config) {
super(config);
}

public static RateLimiterConfig processConfigChange(
SolrRequest.SolrRequestType requestType,
RateLimiterConfig rateLimiterConfig,
Map<String, Object> properties)
throws IOException {
byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY));
RateLimiterConfig rateLimiterConfig, RateLimiterPayload rateLimiterMeta) throws IOException {

RateLimiterPayload rateLimiterMeta;
if (configInput == null || configInput.length == 0) {
rateLimiterMeta = null;
} else {
rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);
// default rate limiter
SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY;
if (rateLimiterMeta.priorityBasedEnabled) {
requestType = SolrRequest.SolrRequestType.PRIORITY_BASED;
}

if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) {
Expand All @@ -68,37 +51,4 @@ public static RateLimiterConfig processConfigChange(
return null;
}
}

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
try {

if (zkClient == null) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

Map<String, Object> clusterPropsJson =
(Map<String, Object>)
Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY));

if (configInput.length == 0) {
// No Rate Limiter configuration defined in clusterprops.json. Return default configuration
// values
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta);
} catch (KeeperException.NoNodeException e) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(
"Error reading cluster property", SolrZkClient.checkInterrupted(e));
} catch (IOException e) {
throw new RuntimeException("Encountered an IOException " + e.getMessage());
}
}
}
Loading

0 comments on commit 71c8121

Please sign in to comment.