Skip to content

Commit

Permalink
Add changes to block calls in cat shards, indices and segments based …
Browse files Browse the repository at this point in the history
…on dynamic limit settings (opensearch-project#15986)

* Add changes to block calls in cat shards, indices and segments based on dynamic limit settings

Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr authored and Harsh Garg committed Oct 8, 2024
1 parent ad97686 commit 5730ea5
Show file tree
Hide file tree
Showing 17 changed files with 636 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.OpenSearchException.UNKNOWN_VERSION_ADDED;
import static org.opensearch.Version.V_2_10_0;
import static org.opensearch.Version.V_2_17_0;
import static org.opensearch.Version.V_2_18_0;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_3_0;
import static org.opensearch.Version.V_2_4_0;
Expand Down Expand Up @@ -1188,6 +1189,14 @@ public static void registerExceptions() {
V_2_17_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.common.breaker.ResponseLimitBreachedException.class,
org.opensearch.common.breaker.ResponseLimitBreachedException::new,
175,
V_2_18_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.cluster.block.IndexCreateBlockException.class,
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
Expand Down Expand Up @@ -525,6 +526,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ExtensionsManager extensionsManager;
private final ResponseLimitSettings responseLimitSettings;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -580,6 +582,7 @@ public ActionModule(
);

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
responseLimitSettings = new ResponseLimitSettings(clusterSettings, settings);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -949,8 +952,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClusterManagerAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestSegmentsAction());
registerHandler.accept(new RestIndicesAction(responseLimitSettings));
registerHandler.accept(new RestSegmentsAction(responseLimitSettings));
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Expand All @@ -970,7 +973,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestIndicesListAction());
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down Expand Up @@ -1041,6 +1044,8 @@ protected void configure() {

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);

bind(ResponseLimitSettings.class).toInstance(responseLimitSettings);
}

public ActionFilters getActionFilters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public class CatShardsRequest extends ClusterManagerNodeReadRequest<CatShardsReq

private String[] indices;
private TimeValue cancelAfterTimeInterval;
private boolean requestLimitCheckSupported;

public CatShardsRequest() {}

public CatShardsRequest(StreamInput in) throws IOException {
super(in);
this.requestLimitCheckSupported = false;
}

@Override
Expand All @@ -55,6 +57,14 @@ public TimeValue getCancelAfterTimeInterval() {
return this.cancelAfterTimeInterval;
}

public void setRequestLimitCheckSupported(final boolean requestLimitCheckSupported) {
this.requestLimitCheckSupported = requestLimitCheckSupported;
}

public boolean isRequestLimitCheckSupported() {
return this.requestLimitCheckSupported;
}

@Override
public ClusterAdminTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ClusterAdminTask(id, type, action, parentTaskId, headers, this.cancelAfterTimeInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.Objects;

import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.SHARDS;

/**
* Perform cat shards action
*
Expand All @@ -31,11 +37,18 @@
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {

private final NodeClient client;
private final ResponseLimitSettings responseLimitSettings;

@Inject
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
public TransportCatShardsAction(
NodeClient client,
TransportService transportService,
ActionFilters actionFilters,
ResponseLimitSettings responseLimitSettings
) {
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
this.client = client;
this.responseLimitSettings = responseLimitSettings;
}

@Override
Expand Down Expand Up @@ -73,6 +86,7 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down Expand Up @@ -107,4 +121,19 @@ public void onFailure(Exception e) {
}

}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
final ActionListener<CatShardsResponse> listener
) {
if (shardsRequest.isRequestLimitCheckSupported()
&& Objects.nonNull(clusterStateResponse)
&& Objects.nonNull(clusterStateResponse.getState())) {
int limit = responseLimitSettings.getCatShardsResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getRoutingTable(), SHARDS, limit)) {
listener.onFailure(new ResponseLimitBreachedException("Too many shards requested.", limit, SHARDS));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.breaker;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Thrown when api response breaches threshold limit.
*
* @opensearch.internal
*/
public class ResponseLimitBreachedException extends OpenSearchException {

private final int responseLimit;
private final ResponseLimitSettings.LimitEntity limitEntity;

public ResponseLimitBreachedException(StreamInput in) throws IOException {
super(in);
responseLimit = in.readVInt();
limitEntity = in.readEnum(ResponseLimitSettings.LimitEntity.class);
}

public ResponseLimitBreachedException(String msg, int responseLimit, ResponseLimitSettings.LimitEntity limitEntity) {
super(msg);
this.responseLimit = responseLimit;
this.limitEntity = limitEntity;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(responseLimit);
out.writeEnum(limitEntity);
}

public int getResponseLimit() {
return responseLimit;
}

public ResponseLimitSettings.LimitEntity getLimitEntity() {
return limitEntity;
}

@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}

@Override
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("response_limit", responseLimit);
builder.field("limit_entity", limitEntity);
}
}
Loading

0 comments on commit 5730ea5

Please sign in to comment.