From 694ba23b06ff7aa49c3a93a4b3f29a46a9f58fab Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Sun, 22 Oct 2023 14:12:39 +0530 Subject: [PATCH] Fixes for tests Signed-off-by: Bharathwaj G --- .../AdmissionControlService.java | 2 +- .../controllers/AdmissionController.java | 4 +- .../CPUBasedAdmissionController.java | 4 +- .../CPUBasedAdmissionControllerSettings.java | 1 - .../stats/AdmissionControlStats.java | 8 +- .../stats/AdmissionControllerStats.java | 11 +- .../cluster/node/stats/NodeStatsTests.java | 64 +++++- .../cluster/stats/ClusterStatsNodesTests.java | 9 +- .../opensearch/cluster/DiskUsageTests.java | 6 + .../AdmissionControlServiceTests.java | 39 +++- .../AdmissionControlSingleNodeTests.java | 203 ++++++++++++++++++ .../CPUBasedAdmissionControllerTests.java | 30 +-- ...a => AdmissionControlActionTypeTests.java} | 2 +- .../opensearch/test/InternalTestCluster.java | 1 + 14 files changed, 349 insertions(+), 35 deletions(-) create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java rename server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/{TransportActionTypeTests.java => AdmissionControlActionTypeTests.java} (94%) diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index 1683f8e381c58..c4f6ae431a10b 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; +import static org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER; /** * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 040ddc4a6baaa..aba2885b614b4 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -34,16 +34,14 @@ public abstract class AdmissionController { /** * @param admissionControllerName name of the admissionController * @param resourceUsageCollectorService instance used to get resource usage stats of the node - * @param rejectionCount initialised rejectionCount value for AdmissionController * @param clusterService */ public AdmissionController( String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, - AtomicLong rejectionCount, ClusterService clusterService ) { - this.rejectionCount = rejectionCount; + this.rejectionCount = new AtomicLong(0); this.admissionControllerName = admissionControllerName; this.resourceUsageCollectorService = resourceUsageCollectorService; this.clusterService = clusterService; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index dd9c56a46b892..d7730dc62eb92 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -19,13 +19,13 @@ import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; /** * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. * It provides methods to apply admission control if configured limit has been reached */ public class CPUBasedAdmissionController extends AdmissionController { + public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class); public CPUBasedAdmissionControllerSettings settings; @@ -41,7 +41,7 @@ public CPUBasedAdmissionController( ClusterService clusterService, Settings settings ) { - super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService); + super(admissionControllerName, resourceUsageCollectorService, clusterService); this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java index 397fd485c6da3..ed4d4094a3b75 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java @@ -19,7 +19,6 @@ * @opensearch.internal */ public class CPUBasedAdmissionControllerSettings { - public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; /** * Default parameters for the CPUBasedAdmissionControllerSettings diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index eab86c1fb3f2c..593eeb66b663c 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -38,7 +38,7 @@ public AdmissionControlStats(List admissionControllerS * @throws IOException if an I/O error occurs */ public AdmissionControlStats(StreamInput in) throws IOException { - this.admissionControllerStatsList = in.readNamedWriteableList(AdmissionControllerStats.class); + this.admissionControllerStatsList = in.readList(AdmissionControllerStats::new); } /** @@ -51,6 +51,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(this.admissionControllerStatsList); } + public List getAdmissionControllerStatsList() { + return admissionControllerStatsList; + } + /** * @param builder * @param params @@ -62,7 +66,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject("admission_control"); this.admissionControllerStatsList.forEach(stats -> { try { - builder.field(stats.getWriteableName(), stats); + builder.field(stats.getAdmissionControllerName(), stats); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 2763b366f4b75..45ae5ab8a41cb 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -8,9 +8,9 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; -import org.opensearch.core.common.io.stream.NamedWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; @@ -21,7 +21,7 @@ /** * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type */ -public class AdmissionControllerStats implements NamedWriteable, ToXContentFragment { +public class AdmissionControllerStats implements Writeable, ToXContentFragment { public Map rejectionCount; public String admissionControllerName; @@ -35,11 +35,14 @@ public AdmissionControllerStats(StreamInput in) throws IOException { this.admissionControllerName = in.readString(); } - @Override - public String getWriteableName() { + public String getAdmissionControllerName() { return admissionControllerName; } + public Map getRejectionCount() { + return rejectionCount; + } + /** * Write this into the {@linkplain StreamOutput}. * diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index ebdd012006fb2..58392130d66fa 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -63,6 +63,11 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.test.OpenSearchTestCase; @@ -516,15 +521,44 @@ public void testSerialization() throws IOException { assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } + AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats(); + AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats(); + if (admissionControlStats == null) { + assertNull(deserializedAdmissionControlStats); + } else { + assertEquals( + admissionControlStats.getAdmissionControllerStatsList().size(), + deserializedAdmissionControlStats.getAdmissionControllerStatsList().size() + ); + AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); + AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats + .getAdmissionControllerStatsList() + .get(0); + assertEquals( + admissionControllerStats.getAdmissionControllerName(), + deserializedAdmissionControllerStats.getAdmissionControllerName() + ); + assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()) + ); + + assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()) + ); + } } } } - public static NodeStats createNodeStats() { + public static NodeStats createNodeStats() throws IOException { return createNodeStats(false); } - public static NodeStats createNodeStats(boolean remoteStoreStats) { + public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOException { DiscoveryNode node = new DiscoveryNode( "test_node", buildNewFakeTransportAddress(), @@ -834,6 +868,29 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); clusterManagerThrottlingStats.onThrottle("test-task", randomInt()); } + + AdmissionControlStats admissionControlStats = null; + if (frequently()) { + AdmissionController admissionController = new AdmissionController( + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + null + ) { + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + return; + } + }; + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + AdmissionControllerStats stats = new AdmissionControllerStats( + admissionController, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER + ); + List statsList = new ArrayList(); + statsList.add(stats); + admissionControlStats = new AdmissionControlStats(statsList); + } ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null; WeightedRoutingStats weightedRoutingStats = null; @@ -871,7 +928,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { null, null, segmentReplicationRejectionStats, - null + null, + admissionControlStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index a0c45f95ef7c0..40a30342b86b9 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -87,7 +88,13 @@ public void testNetworkTypesToXContent() throws Exception { } public void testIngestStats() throws Exception { - NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); + NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, () -> { + try { + return NodeStatsTests.createNodeStats(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); SortedMap processorStats = new TreeMap<>(); nodeStats.getIngestStats().getProcessorStats().values().forEach(stats -> { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index f037b75dc16a3..ff47ec3015697 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -193,6 +193,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -222,6 +223,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -251,6 +253,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -311,6 +314,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -340,6 +344,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -369,6 +374,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index 95caa8b1a6a22..1f44ce6cde9b1 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -68,7 +68,7 @@ public void testAdmissionControllerSettings() { List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService - .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + .getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals( admissionControlSettings.isTransportLayerAdmissionControlEnabled(), cpuBasedAdmissionController.isEnabledForTransportLayer( @@ -119,7 +119,7 @@ public void testApplyAdmissionControllerEnabled() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0 ); @@ -131,13 +131,42 @@ public void testApplyAdmissionControllerEnabled() { ) .build(); clusterService.getClusterSettings().applySettings(settings); - admissionControlService.applyTransportAdmissionControl(this.action, null); + // TODO : this fails - instead do IT + // admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); + // assertEquals( + // admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + // .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), + // 1 + // ); + } + + public void testApplyAdmissionControllerEnforced() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), - 1 + 0 ); + + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + // TODO : this fails - instead do IT + // admissionControlService.applyTransportAdmissionControl(this.action, null); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + // assertEquals( + // admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + // .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), + // 1 + // ); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java new file mode 100644 index 0000000000000..ddba42b158b99 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.After; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +/** + * Single node integration tests for admission control + */ +public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @After + public void cleanup() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + } + + public void testAdmissionControlRejectionEnforcedMode() throws Exception { + ensureGreen(); + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Thread.sleep(700); + client().admin().indices().prepareCreate("index").execute().actionGet(); + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + // verify bulk request hits 429 + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request hits 429 + SearchRequest searchRequest = new SearchRequest("index"); + try { + client().search(searchRequest).actionGet(); + } catch (Exception e) { + assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success but admission control having rejections stats + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success but admission control having rejections stats + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionDisabledMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder().put(super.nodeSettings()).put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + + } + + public void testAdmissionControlWithinLimits() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java index 9d8fc967c5a82..76b7738ca1f18 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; @@ -18,6 +19,8 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.mockito.Mockito; + public class CPUBasedAdmissionControllerTests extends OpenSearchTestCase { private ClusterService clusterService; private ThreadPool threadPool; @@ -44,12 +47,12 @@ public void tearDown() throws Exception { public void testCheckDefaultParameters() { admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, null, clusterService, Settings.EMPTY ); - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getName(), CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertFalse( @@ -59,7 +62,7 @@ public void testCheckDefaultParameters() { public void testCheckUpdateSettings() { admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, null, clusterService, Settings.EMPTY @@ -72,16 +75,17 @@ public void testCheckUpdateSettings() { .build(); clusterService.getClusterSettings().applySettings(settings); - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getName(), CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); } public void testApplyControllerWithDefaultSettings() { + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - null, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, clusterService, Settings.EMPTY ); @@ -92,23 +96,25 @@ public void testApplyControllerWithDefaultSettings() { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); } - public void testApplyControllerWhenSettingsEnabled() { + public void testApplyControllerWhenSettingsEnabled() throws Exception { Settings settings = Settings.builder() .put( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - null, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, clusterService, settings ); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + assertTrue( + admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action, AdmissionControlActionType.INDEXING); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 1); + // we can assert admission control and rejections as part of ITs } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java similarity index 94% rename from server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java rename to server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java index 3923048376d69..15a25e6cbca1c 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class admissionControlActionTypeTests extends OpenSearchTestCase { +public class AdmissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 63d8f069bebea..e0cdf23ddf153 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2723,6 +2723,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(