Skip to content

Commit

Permalink
Add RateLimiter config property to enable specific nodes (#236)
Browse files Browse the repository at this point in the history
* Added suppost to enable rate limiter in nodes

* minor change

* minor fix

* changed the func name
  • Loading branch information
hiteshk25 authored Nov 4, 2024
1 parent 71c8121 commit b3d5916
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ private void init(ServletContext servletContext) {

Builder builder = new Builder(zkClient);

this.rateLimitManager = builder.build();
String hostname = zkController != null ? zkController.getHostName() : "";

this.rateLimitManager = builder.build(hostname);

if (zkController != null) {
zkController.zkStateReader.registerClusterPropertiesListener(this.rateLimitManager);
Expand Down
17 changes: 11 additions & 6 deletions solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public class RateLimitManager implements ClusterPropertiesListener {
public static final long DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS = -1;
private final ConcurrentHashMap<String, RequestRateLimiter> requestRateLimiterMap;

public RateLimitManager() {
private final String hostname;

public RateLimitManager(String hostname) {
this.hostname = hostname;
this.requestRateLimiterMap = new ConcurrentHashMap<>();
}

Expand All @@ -81,7 +84,7 @@ public boolean onChange(Map<String, Object> properties) {
throw new UncheckedIOException(e);
}
}

rateLimiterMeta.maybeEnableForHost(hostname);
// Hack: We only support query rate limiting for now
requestRateLimiterMap.compute(
rateLimiterMeta.priorityBasedEnabled
Expand Down Expand Up @@ -209,10 +212,10 @@ public Builder(SolrZkClient solrZkClient) {
this.solrZkClient = solrZkClient;
}

public RateLimitManager build() {
RateLimitManager rateLimitManager = new RateLimitManager();
public RateLimitManager build(String hostname) {
RateLimitManager rateLimitManager = new RateLimitManager(hostname);

RateLimiterConfig rateLimiterConfig = constructQueryRateLimiterConfig(solrZkClient);
RateLimiterConfig rateLimiterConfig = constructQueryRateLimiterConfig(solrZkClient, hostname);

if (rateLimiterConfig.priorityBasedEnabled) {
rateLimitManager.registerRequestRateLimiter(
Expand All @@ -228,7 +231,8 @@ public RateLimitManager build() {

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
private static RateLimiterConfig constructQueryRateLimiterConfig(
SolrZkClient zkClient, String hostname) {
try {

if (zkClient == null) {
Expand All @@ -250,6 +254,7 @@ private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zk

RateLimiterPayload rateLimiterMeta =
mapper.readValue(configInput, RateLimiterPayload.class);
rateLimiterMeta.maybeEnableForHost(hostname);
return rateLimiterMeta.priorityBasedEnabled
? new RateLimiterConfig(SolrRequest.SolrRequestType.PRIORITY_BASED, rateLimiterMeta)
: new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testConcurrentQueries() throws Exception {
RateLimitManager.Builder builder =
new MockBuilder(
null /* dummy SolrZkClient */, new MockRequestRateLimiter(rateLimiterConfig));
RateLimitManager rateLimitManager = builder.build();
RateLimitManager rateLimitManager = builder.build("localhost");

solrDispatchFilter.replaceRateLimitManager(rateLimitManager);

Expand Down Expand Up @@ -121,7 +121,7 @@ public void testConcurrentQueries() throws Exception {
@SuppressWarnings("try")
public void testSlotBorrowingAcquisitionTimeout()
throws InterruptedException, IOException, ExecutionException {
RateLimitManager mgr = new RateLimitManager();
RateLimitManager mgr = new RateLimitManager("localhost");
Random r = random();
int slotLimit = r.nextInt(20) + 1;
int guaranteed = r.nextInt(slotLimit);
Expand Down Expand Up @@ -355,7 +355,7 @@ public void testSlotBorrowing() throws Exception {
null /*dummy SolrZkClient */,
new MockRequestRateLimiter(queryRateLimiterConfig),
new MockRequestRateLimiter(indexRateLimiterConfig));
RateLimitManager rateLimitManager = builder.build();
RateLimitManager rateLimitManager = builder.build("localhost");

solrDispatchFilter.replaceRateLimitManager(rateLimitManager);

Expand Down Expand Up @@ -488,8 +488,8 @@ public MockBuilder(
}

@Override
public RateLimitManager build() {
RateLimitManager rateLimitManager = new RateLimitManager();
public RateLimitManager build(String hostname) {
RateLimitManager rateLimitManager = new RateLimitManager("localhost");

rateLimitManager.registerRequestRateLimiter(
queryRequestRateLimiter, SolrRequest.SolrRequestType.QUERY);
Expand Down Expand Up @@ -632,7 +632,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException {

@Test
public void testPriorityBasedRateLimiter() throws Exception {
RateLimitManager rateLimitManager = new RateLimitManager();
RateLimitManager rateLimitManager = new RateLimitManager("localhost");

// PriorityBasedRateLimiter
RateLimiterConfig rateLimiterConfig =
Expand Down Expand Up @@ -683,7 +683,7 @@ public void testPriorityBasedRateLimiter() throws Exception {

@Test
public void testPriorityBasedRateLimiterTimeout() throws Exception {
RateLimitManager rateLimitManager = new RateLimitManager();
RateLimitManager rateLimitManager = new RateLimitManager("localhost");

// PriorityBasedRateLimiter
RateLimiterConfig rateLimiterConfig =
Expand Down Expand Up @@ -730,7 +730,7 @@ public void testPriorityBasedRateLimiterTimeout() throws Exception {

@Test
public void testPriorityBasedRateLimiterDynamicChange() throws Exception {
RateLimitManager rateLimitManager = new RateLimitManager();
RateLimitManager rateLimitManager = new RateLimitManager("localhost");

// PriorityBasedRateLimiter
RateLimiterConfig rateLimiterConfig =
Expand Down Expand Up @@ -770,4 +770,47 @@ public void testPriorityBasedRateLimiterDynamicChange() throws Exception {

assertEquals(true, rateLimiter.getRateLimiterConfig().priorityBasedEnabled);
}

@Test
public void testEnableRateLimiterOnNode() throws Exception {
RateLimitManager rateLimitManager = new RateLimitManager("localhost");

// PriorityBasedRateLimiter
RateLimiterConfig rateLimiterConfig =
new RateLimiterConfig(
SolrRequest.SolrRequestType.QUERY,
false,
1,
10,
1 /* allowedRequests */,
true /* isSlotBorrowing */,
false);

QueryRateLimiter requestRateLimiter = new QueryRateLimiter(rateLimiterConfig);

rateLimitManager.registerRequestRateLimiter(
requestRateLimiter, SolrRequest.SolrRequestType.QUERY);

RequestRateLimiter rateLimiter =
rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY);
assertFalse(rateLimiter.rateLimiterConfig.isEnabled);

Map<String, Object> properties = new HashMap<>();
Map<String, Object> rateLimiterProps = new HashMap<>();
rateLimiterProps.put("enabled", false);
rateLimiterProps.put("guaranteedSlots", 1);
rateLimiterProps.put("allowedRequests", 1);
rateLimiterProps.put("slotBorrowingEnabled", false);
rateLimiterProps.put("slotAcquisitionTimeoutInMS", 100);
rateLimiterProps.put("priorityBasedEnabled", false);
rateLimiterProps.put("nodesEnabled", "localhost");
properties.put("rate-limiters", rateLimiterProps);

// updating rate limiter
rateLimitManager.onChange(properties);

rateLimiter = rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY);

assertTrue(rateLimiter.getRateLimiterConfig().isEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class RateLimiterPayload implements ReflectMapWriter {

@JsonProperty public Boolean priorityBasedEnabled;

@JsonProperty public String nodesEnabled;

public RateLimiterPayload copy() {
RateLimiterPayload result = new RateLimiterPayload();

Expand All @@ -44,6 +46,7 @@ public RateLimiterPayload copy() {
result.slotBorrowingEnabled = slotBorrowingEnabled;
result.slotAcquisitionTimeoutInMS = slotAcquisitionTimeoutInMS;
result.priorityBasedEnabled = priorityBasedEnabled;
result.nodesEnabled = nodesEnabled;
return result;
}

Expand Down Expand Up @@ -71,4 +74,18 @@ public int hashCode() {
slotAcquisitionTimeoutInMS,
priorityBasedEnabled);
}

public void maybeEnableForHost(String hostname) {
if (!this.enabled && !hostname.isEmpty()) {
if (this.nodesEnabled != null && !this.nodesEnabled.isEmpty()) {
String[] hosts = this.nodesEnabled.split(",");
for (String host : hosts) {
if (host.trim().equals(hostname)) {
this.enabled = true;
break;
}
}
}
}
}
}

0 comments on commit b3d5916

Please sign in to comment.