From d9d07eb734088937699df9c02af685fe7f81f870 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Sun, 22 Oct 2023 10:02:10 +0530 Subject: [PATCH] javadoc changes and more Signed-off-by: Bharathwaj G --- CHANGELOG.md | 4 +- .../admin/cluster/node/stats/NodeStats.java | 7 +- .../stats/TransportClusterStatsAction.java | 1 + .../TransportReplicationAction.java | 3 +- .../common/network/NetworkModule.java | 7 +- .../java/org/opensearch/node/NodeService.java | 2 +- .../AdmissionControlService.java | 51 +++++++------ .../controllers/AdmissionController.java | 24 ++++-- .../CPUBasedAdmissionController.java | 76 +++++++++++++------ .../CPUBasedAdmissionControllerSettings.java | 17 +---- .../stats/AdmissionControlStats.java | 18 ++--- ...ats.java => AdmissionControllerStats.java} | 32 ++++---- .../stats/BaseAdmissionControllerStats.java | 15 ---- .../AdmissionControlTransportHandler.java | 20 ++--- .../AdmissionControlTransportInterceptor.java | 10 ++- .../transport/TransportInterceptor.java | 11 +-- .../transport/TransportService.java | 11 +-- .../AdmissionControlServiceTests.java | 6 +- .../CPUBasedAdmissionControllerTests.java | 16 ++-- .../enums/TransportActionTypeTests.java | 2 +- ...CPUBasedAdmissionControlSettingsTests.java | 1 - 21 files changed, 182 insertions(+), 152 deletions(-) rename server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/{CPUBasedAdmissionControllerStats.java => AdmissionControllerStats.java} (71%) delete mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 76bf757083d15..fd3acf115dd23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) - [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535)) - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) -- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) -- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats +- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) +- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 1598fbaf3711a..330b57cf32c70 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } - if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); } else { admissionControlStats = null; @@ -504,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(repositoriesStats); } + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(admissionControlStats); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5efec8b876435..9c5dcc9e9de3f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 7dd34fff1b159..11046e44b61e0 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -221,7 +221,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){ + // Register only TransportShardBulkAction for admission control ( primary indexing action ) + if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) { transportService.registerRequestHandler( transportPrimaryAction, executor, diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 7fa8ec771b488..5687b2f0a253a 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -300,16 +300,19 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + /** + * Intercept the transport action and perform admission control if applicable + */ @Override public TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { for (TransportInterceptor interceptor : this.transportInterceptors) { - actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType); + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType); } return actualHandler; } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 3c6dd15834f57..224061d09b2c6 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -269,7 +269,7 @@ public NodeStats stats( searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, - admissionControl ? this.admissionControlService.stats(): null + admissionControl ? this.admissionControlService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index b71b062dc788d..1683f8e381c58 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -11,15 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -47,8 +45,14 @@ public class AdmissionControlService { * @param settings Immutable settings instance * @param clusterService ClusterService Instance * @param threadPool ThreadPool Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats */ - public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) { + public AdmissionControlService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + ResourceUsageCollectorService resourceUsageCollectorService + ) { this.threadPool = threadPool; this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); @@ -68,11 +72,13 @@ private void initialise() { /** * - * @param action transport action that is being executed. we are using it for logging while request is rejected - * @param admissionControlActionType type of the admissionControllerActionType + * @param action Transport action name + * @param admissionControlActionType admissionControllerActionType value */ public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { - this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }); + this.ADMISSION_CONTROLLERS.forEach( + (name, admissionController) -> { admissionController.apply(action, admissionControlActionType); } + ); } /** @@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) { private AdmissionController controllerFactory(String admissionControllerName) { switch (admissionControllerName) { case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService); + return new CPUBasedAdmissionController( + admissionControllerName, + this.resourceUsageCollectorService, + this.clusterService, + this.settings + ); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -113,26 +124,18 @@ public AdmissionController getAdmissionController(String controllerName) { return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); } - public AdmissionControlStats stats(){ - List statsList = new ArrayList<>(); - if(this.ADMISSION_CONTROLLERS.size() > 0){ + /** + * Return admission control stats + */ + public AdmissionControlStats stats() { + List statsList = new ArrayList<>(); + if (this.ADMISSION_CONTROLLERS.size() > 0) { this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> { - BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController); - if(admissionControllerStats != null) { - statsList.add(admissionControllerStats); - } + AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName); + statsList.add(admissionControllerStats); }); return new AdmissionControlStats(statsList); } return null; } - - private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) { - switch (admissionController.getName()) { - case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionControllerStats(admissionController); - default: - return null; - } - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 794a70f7a7483..040ddc4a6baaa 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -11,7 +11,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.node.ResourceUsageCollectorService; -import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; @@ -33,11 +32,17 @@ public abstract class AdmissionController { public final ClusterService clusterService; /** - * @param rejectionCount initialised rejectionCount value for AdmissionController - * @param admissionControllerName name of the admissionController + * @param admissionControllerName name of the admissionController + * @param resourceUsageCollectorService instance used to get resource usage stats of the node + * @param rejectionCount initialised rejectionCount value for AdmissionController * @param clusterService */ - public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) { + public AdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + AtomicLong rejectionCount, + ClusterService clusterService + ) { this.rejectionCount = rejectionCount; this.admissionControllerName = admissionControllerName; this.resourceUsageCollectorService = resourceUsageCollectorService; @@ -62,8 +67,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr } /** - * Increment the tracking-objects and apply the admission control if threshold is breached. - * Mostly applicable while applying admission controller + * Apply admission control based on the resource usage for an action */ public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); @@ -74,9 +78,12 @@ public String getName() { return this.admissionControllerName; } + /** + * Add rejection count to the rejection count metric tracked by the admission controller + */ public void addRejectionCount(String admissionControlActionType, long count) { AtomicLong updatedCount = new AtomicLong(0); - if(this.rejectionCountMap.containsKey(admissionControlActionType)){ + if (this.rejectionCountMap.containsKey(admissionControlActionType)) { updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get()); } updatedCount.addAndGet(count); @@ -91,6 +98,9 @@ public long getRejectionCount(String admissionControlActionType) { return rejectionCount.get(); } + /** + * Get rejection stats of the admission controller + */ public Map getRejectionStats() { Map rejectionStats = new HashMap<>(); rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index 2514b1e83fd04..dd9c56a46b892 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -16,11 +16,8 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -33,56 +30,89 @@ public class CPUBasedAdmissionController extends AdmissionController { public CPUBasedAdmissionControllerSettings settings; /** - * - * @param admissionControllerName State of the admission controller + * @param admissionControllerName Name of the admission controller + * @param resourceUsageCollectorService Instance used to get node resource usage stats + * @param clusterService ClusterService Instance + * @param settings Immutable settings instance */ - public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) { - super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService); + public CPUBasedAdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService, + Settings settings + ) { + super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService); this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } /** - * This function will take of applying admission controller based on CPU usage + * Apply admission control based on process CPU usage * @param action is the transport action */ @Override public void apply(String action, AdmissionControlActionType admissionControlActionType) { - // TODO Will extend this logic further currently just incrementing rejectionCount if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { this.applyForTransportLayer(action, admissionControlActionType); } } + /** + * Apply transport layer admission control if configured limit has been reached + */ private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { - if (isLimitsBreached(admissionControlActionType)) { + if (isLimitsBreached(actionName, admissionControlActionType)) { this.addRejectionCount(admissionControlActionType.getType(), 1); if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { - throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached"); + throw new OpenSearchRejectedExecutionException( + String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name()) + ); } } } - private boolean isLimitsBreached(AdmissionControlActionType transportActionType) { - long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType); - Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId()); - if(nodePerformanceStatistics.isPresent()) { - double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); - if (cpuUsage >= maxCpuLimit){ - LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" + - cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]"); - return true; + /** + * Check if the configured resource usage limits are breached for the action + */ + private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { + // check if cluster state is ready + if (clusterService.state() != null && clusterService.state().nodes() != null) { + long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( + this.clusterService.state().nodes().getLocalNodeId() + ); + if (nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit) { + LOGGER.warn( + "CpuBasedAdmissionController rejected the request as the current CPU " + + "usage [{}] exceeds the allowed limit [{}] for transport action [{}]", + cpuUsage, + maxCpuLimit, + actionName + ); + return true; + } } } return false; } - private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) { - switch (transportActionType) { + + /** + * Get CPU rejection threshold based on action type + */ + private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) { + switch (admissionControlActionType) { case SEARCH: return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); default: - throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType()); + throw new IllegalArgumentException( + String.format( + "Admission control not Supported for AdmissionControlActionType: %s", + admissionControlActionType.getType() + ) + ); } } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java index 141e9b68db145..397fd485c6da3 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java @@ -14,9 +14,6 @@ import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import java.util.Arrays; -import java.util.List; - /** * Settings related to cpu based admission controller. * @opensearch.internal @@ -28,15 +25,12 @@ public class CPUBasedAdmissionControllerSettings { * Default parameters for the CPUBasedAdmissionControllerSettings */ public static class Defaults { - public static final long CPU_USAGE = 95; - public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + public static final long CPU_USAGE_LIMIT = 95; } private AdmissionControlMode transportLayerMode; private Long searchCPULimit; private Long indexingCPULimit; - - private final List transportActionsList; /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set * rejection will be performed, otherwise only rejection metrics will be populated. @@ -54,7 +48,7 @@ public static class Defaults { */ public static final Setting SEARCH_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.search.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -64,7 +58,7 @@ public static class Defaults { */ public static final Setting INDEXING_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.indexing.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -75,7 +69,6 @@ public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Sett clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); - this.transportActionsList = Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE; clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); } @@ -103,8 +96,4 @@ public void setIndexingCPULimit(Long indexingCPULimit) { public void setSearchCPULimit(Long searchCPULimit) { this.searchCPULimit = searchCPULimit; } - - public List getTransportActionsList() { - return transportActionsList; - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index 188feb77318e4..eab86c1fb3f2c 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -8,7 +8,6 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; -import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -18,15 +17,18 @@ import java.io.IOException; import java.util.List; +/** + * Class for admission control stats used as part of node stats + */ public class AdmissionControlStats implements ToXContentFragment, Writeable { - List admissionControllerStatsList; + List admissionControllerStatsList; /** * * @param admissionControllerStatsList list of admissionControllerStats */ - public AdmissionControlStats(List admissionControllerStatsList){ + public AdmissionControlStats(List admissionControllerStatsList) { this.admissionControllerStatsList = admissionControllerStatsList; } @@ -36,11 +38,7 @@ public AdmissionControlStats(List admissionControl * @throws IOException if an I/O error occurs */ public AdmissionControlStats(StreamInput in) throws IOException { - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.admissionControllerStatsList = in.readNamedWriteableList(BaseAdmissionControllerStats.class); - } else { - this.admissionControllerStatsList = null; - } + this.admissionControllerStatsList = in.readNamedWriteableList(AdmissionControllerStats.class); } /** @@ -50,9 +48,7 @@ public AdmissionControlStats(StreamInput in) throws IOException { */ @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeList(this.admissionControllerStatsList); - } + out.writeList(this.admissionControllerStatsList); } /** diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java similarity index 71% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 7b4e4a9695509..2763b366f4b75 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -8,35 +8,38 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; +import org.opensearch.core.common.io.stream.NamedWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import java.io.IOException; import java.util.Map; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; -public class CPUBasedAdmissionControllerStats extends BaseAdmissionControllerStats { - - /** - * Returns the name of the writeable object - */ - @Override - public String getWriteableName() { - return CPU_BASED_ADMISSION_CONTROLLER; - } - +/** + * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type + */ +public class AdmissionControllerStats implements NamedWriteable, ToXContentFragment { public Map rejectionCount; + public String admissionControllerName; - public CPUBasedAdmissionControllerStats(AdmissionController admissionController){ + public AdmissionControllerStats(AdmissionController admissionController, String admissionControllerName) { this.rejectionCount = admissionController.getRejectionStats(); + this.admissionControllerName = admissionControllerName; } - public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { + public AdmissionControllerStats(StreamInput in) throws IOException { this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + this.admissionControllerName = in.readString(); + } + + @Override + public String getWriteableName() { + return admissionControllerName; } + /** * Write this into the {@linkplain StreamOutput}. * @@ -45,6 +48,7 @@ public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + out.writeString(this.admissionControllerName); } /** diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java deleted file mode 100644 index 0ee1807bf80da..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.stats; - -import org.opensearch.core.common.io.stream.NamedWriteable; -import org.opensearch.core.xcontent.ToXContentFragment; - -public abstract class BaseAdmissionControllerStats implements NamedWriteable, ToXContentFragment { -} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index dfe286d9b9537..6561a670f0794 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -54,15 +54,17 @@ public AdmissionControlTransportHandler( */ @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { - // intercept all the transport requests here and apply admission control - try { - // TODO Need to evaluate if we need to apply admission control or not if force Execution is true will update in next PR. - this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); - } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { - log.warn(openSearchRejectedExecutionException.getMessage()); - channel.sendResponse(openSearchRejectedExecutionException); - } catch (final Exception e) { - throw e; + // skip admission control if force execution is true + if (!this.forceExecution) { + // intercept the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + } catch (final Exception e) { + throw e; + } } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index c725af821ac8f..ae1520bca769d 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -15,7 +15,7 @@ import org.opensearch.transport.TransportRequestHandler; /** - * This class allows throttling to intercept requests on both the sender and the receiver side. + * This class allows throttling by intercepting requests on both the sender and the receiver side. */ public class AdmissionControlTransportInterceptor implements TransportInterceptor { @@ -37,6 +37,12 @@ public TransportRequestHandler interceptHandler( TransportRequestHandler actualHandler, AdmissionControlActionType admissionControlActionType ) { - return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution, admissionControlActionType); + return new AdmissionControlTransportHandler<>( + action, + actualHandler, + this.admissionControlService, + forceExecution, + admissionControlActionType + ); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 12b0990a5d692..e8efbeb7de3f9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -59,21 +59,14 @@ default TransportRequestHandler interceptHandler } /** - * - * @param action - * @param executor - * @param forceExecution - * @param actualHandler - * @param transportActionType - * @return - * @param + * This is called for handlers that needs admission control support */ default TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { return interceptHandler(action, executor, forceExecution, actualHandler); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index a55a20478aa3d..211564c6bb4ac 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -1243,13 +1243,14 @@ public void registerRequestHandler( } /** - * Registers a new request handler + * Registers a new request handler with admission control support * * @param action The action the request handler is associated with - * @param requestReader The request class that will be used to construct new instances for streaming * @param executor The executor the request handling will be executed on * @param forceExecution Force execution on the executor queue and never reject it - * @param transportActionType Check the request size and raise an exception in case the limit is breached. + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @param requestReader The request class that will be used to construct new instances for streaming * @param handler The handler itself that implements the request handling */ public void registerRequestHandler( @@ -1257,12 +1258,12 @@ public void registerRequestHandler( String executor, boolean forceExecution, boolean canTripCircuitBreaker, - AdmissionControlActionType transportActionType, + AdmissionControlActionType admissionControlActionType, Writeable.Reader requestReader, TransportRequestHandler handler ) { validateActionName(action); - handler = interceptor.interceptHandler(action, executor, forceExecution, handler, transportActionType); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index abd38a3cbf1fb..95caa8b1a6a22 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -109,12 +109,14 @@ public void testApplyAdmissionControllerDisabled() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); - admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); }); + admissionControllerList.forEach(admissionController -> { + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + }); } public void testApplyAdmissionControllerEnabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool,null); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java index 2473b242f71b5..9d8fc967c5a82 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -45,9 +45,9 @@ public void tearDown() throws Exception { public void testCheckDefaultParameters() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); @@ -60,9 +60,9 @@ public void testCheckDefaultParameters() { public void testCheckUpdateSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); Settings settings = Settings.builder() .put( @@ -81,9 +81,9 @@ public void testCheckUpdateSettings() { public void testApplyControllerWithDefaultSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); @@ -101,9 +101,9 @@ public void testApplyControllerWhenSettingsEnabled() { .build(); admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - settings, + null, clusterService, - null + settings ); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java index 419e9ea8d4827..3923048376d69 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class TransportActionTypeTests extends OpenSearchTestCase { +public class admissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java index 43103926a69a2..04777f3f6173f 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -64,7 +64,6 @@ public void testDefaultSettings() { assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); - assertEquals(cpuBasedAdmissionControllerSettings.getTransportActionsList(), Arrays.asList("indexing", "search")); } public void testGetConfiguredSettings() {