-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add changes to block non-paginated calls in cat shards, indices and s…
…egments Signed-off-by: Sumit Bansal <[email protected]>
- Loading branch information
Showing
8 changed files
with
238 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
137 changes: 137 additions & 0 deletions
137
server/src/main/java/org/opensearch/rest/action/cat/RequestLimitSettings.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.rest.action.cat; | ||
|
||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.routing.IndexRoutingTable; | ||
import org.opensearch.cluster.routing.IndexShardRoutingTable; | ||
import org.opensearch.cluster.routing.RoutingTable; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required. | ||
*/ | ||
public class RequestLimitSettings { | ||
|
||
/** | ||
* Enum to represent action names against whom we need to perform limit checks. | ||
*/ | ||
public enum BlockAction { | ||
CAT_INDICES, | ||
CAT_SHARDS, | ||
CAT_SEGMENTS | ||
} | ||
|
||
private volatile int catIndicesLimit; | ||
private volatile int catShardsLimit; | ||
private volatile int catSegmentsLimit; | ||
|
||
public static final Setting<Integer> CAT_INDICES_LIMIT_SETTING = Setting.intSetting( | ||
"cat.indices.limit", | ||
-1, | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<Integer> CAT_SHARDS_LIMIT_SETTING = Setting.intSetting( | ||
"cat.shards.limit", | ||
-1, | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public static final Setting<Integer> CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting( | ||
"cat.segments.limit", | ||
-1, | ||
Setting.Property.NodeScope, | ||
Setting.Property.Dynamic | ||
); | ||
|
||
public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) { | ||
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings)); | ||
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings)); | ||
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings)); | ||
|
||
clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting); | ||
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting); | ||
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting); | ||
} | ||
|
||
/** | ||
* Method to check if the circuit breaker limit has reached for an action. | ||
* The limits are controlled via dynamic settings. | ||
* | ||
* @param clusterState {@link ClusterState} | ||
* @param actionToCheck {@link BlockAction} | ||
* @return True/False | ||
*/ | ||
public boolean isCircuitBreakerLimitBreached(final ClusterState clusterState, BlockAction actionToCheck) { | ||
if (Objects.isNull(clusterState)) return false; | ||
switch (actionToCheck) { | ||
case CAT_INDICES: | ||
if (catIndicesLimit <= 0) return false; | ||
int indicesCount = getTotalIndices(clusterState); | ||
if (indicesCount > catIndicesLimit) return true; | ||
break; | ||
case CAT_SHARDS: | ||
if (catShardsLimit <= 0) return false; | ||
int totalShards = getTotalShards(clusterState); | ||
if (totalShards > catShardsLimit) return true; | ||
break; | ||
case CAT_SEGMENTS: | ||
if (catSegmentsLimit <= 0) return false; | ||
if (getTotalIndices(clusterState) > catSegmentsLimit) return true; | ||
break; | ||
} | ||
return false; | ||
} | ||
|
||
private void setCatShardsLimitSetting(final int catShardsLimit) { | ||
this.catShardsLimit = catShardsLimit; | ||
} | ||
|
||
private void setCatIndicesLimitSetting(final int catIndicesLimit) { | ||
this.catIndicesLimit = catIndicesLimit; | ||
} | ||
|
||
private void setCatSegmentsLimitSetting(final int catSegmentsLimit) { | ||
this.catSegmentsLimit = catSegmentsLimit; | ||
} | ||
|
||
private static int getTotalIndices(final ClusterState clusterState) { | ||
return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0); | ||
} | ||
|
||
private static int getTotalShards(final ClusterState clusterState) { | ||
final RoutingTable routingTable = clusterState.getRoutingTable(); | ||
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting(); | ||
int totalShards = 0; | ||
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) { | ||
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) { | ||
totalShards += indexShardRoutingTableEntry.getValue().getShards().size(); | ||
} | ||
} | ||
return totalShards; | ||
} | ||
|
||
// TODO: Evaluate if we can move this to common util. | ||
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) { | ||
try { | ||
return supplier.get(); | ||
} catch (NullPointerException e) { | ||
return defaultValue; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.