Skip to content

Commit

Permalink
Added changes to integrade cpu AC to ResourceUsageCollector and Emit …
Browse files Browse the repository at this point in the history
…Stats

Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva authored and bharath-techie committed Oct 22, 2023
1 parent 14d4a63 commit 01e320f
Show file tree
Hide file tree
Showing 25 changed files with 505 additions and 100 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
Expand Down Expand Up @@ -154,6 +155,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private RepositoriesStats repositoriesStats;

@Nullable
private AdmissionControlStats admissionControlStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -226,6 +230,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
repositoriesStats = null;
}
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
} else {
admissionControlStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -255,7 +264,8 @@ public NodeStats(
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -284,6 +294,7 @@ public NodeStats(
this.searchPipelineStats = searchPipelineStats;
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -435,6 +446,11 @@ public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}

@Nullable
public AdmissionControlStats getAdmissionControlStats() {
return admissionControlStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -588,6 +604,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getRepositoriesStats() != null) {
getRepositoriesStats().toXContent(builder, params);
}
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public enum Metric {
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories");
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics)
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -542,6 +543,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
DFS_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeDfsPhase(
request,
Expand All @@ -556,6 +560,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand All @@ -575,6 +582,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ID_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
QuerySearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand Down Expand Up @@ -633,6 +643,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
ThreadPool.Names.SAME,
true,
true,
AdmissionControlActionType.SEARCH,
ShardFetchSearchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -219,14 +221,26 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
AdmissionControlActionType.INDEXING,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
} else {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
}

// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
Expand Down Expand Up @@ -299,6 +300,20 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
return actualHandler;
}

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler,
AdmissionControlActionType transportActionType
) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
}
return actualHandler;
}

@Override
public AsyncSender interceptSender(AsyncSender sender) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
Expand Down
29 changes: 16 additions & 13 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,24 @@ protected Node(

final RestController restController = actionModule.getRestController();

final AdmissionControlService admissionControlService = new AdmissionControlService(
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings(),
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);

final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService,
threadPool,
resourceUsageCollectorService
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
);
Expand Down Expand Up @@ -1101,16 +1113,6 @@ protected Node(
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -1135,7 +1137,8 @@ protected Node(
taskCancellationMonitoringService,
resourceUsageCollectorService,
segmentReplicationStatsTracker,
repositoryService
repositoryService,
admissionControlService
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class NodeService implements Closeable {
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;
private final RepositoriesService repositoriesService;
AdmissionControlService admissionControlService;

private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

Expand Down Expand Up @@ -123,7 +125,8 @@ public class NodeService implements Closeable {
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService,
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService
RepositoriesService repositoriesService,
AdmissionControlService admissionControlService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -148,6 +151,7 @@ public class NodeService implements Closeable {
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.resourceUsageCollectorService = resourceUsageCollectorService;
this.repositoriesService = repositoriesService;
this.admissionControlService = admissionControlService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
Expand Down Expand Up @@ -232,7 +236,8 @@ public NodeStats stats(
boolean searchPipelineStats,
boolean resourceUsageStats,
boolean segmentReplicationTrackerStats,
boolean repositoriesStats
boolean repositoriesStats,
boolean admissionControl
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -263,7 +268,8 @@ public NodeStats stats(
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats(): null
);
}

Expand Down
Loading

0 comments on commit 01e320f

Please sign in to comment.