From 88fbbf7211b879a94da1f9fb52d1690aa7fa69ba Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Tue, 15 Oct 2024 15:11:25 -0700 Subject: [PATCH 1/5] add wlm stats IT Signed-off-by: Ruirui Zhang --- CHANGELOG.md | 1 + .../wlm/WorkloadManagementStatsIT.java | 325 ++++++++++++++++++ .../org/opensearch/wlm/QueryGroupService.java | 3 + 3 files changed, 329 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e682c1b226f4a..ad86014161153 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651)) - [Workload Management] Add orchestrator for wlm resiliency (QueryGroupService) ([#15925](https://github.com/opensearch-project/OpenSearch/pull/15925)) - [Workload Management] Add QueryGroup Stats API Logic ([15777](https://github.com/opensearch-project/OpenSearch/pull/15777)) +- [Workload Management] Add Workload Management Stats IT ([16341](https://github.com/opensearch-project/OpenSearch/pull/16341)) - Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424)) - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) - Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967), [#16110](https://github.com/opensearch-project/OpenSearch/pull/16110)) diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java new file mode 100644 index 0000000000000..01573c312fa25 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -0,0 +1,325 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.ActionType; +import org.opensearch.action.admin.cluster.wlm.WlmStatsAction; +import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest; +import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) +public class WorkloadManagementStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + final static String PUT = "PUT"; + final static String DELETE = "DELETE"; + final static String DEFAULT_QUERY_GROUP = "DEFAULT_QUERY_GROUP"; + final static String _ALL = "_all"; + private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + + public WorkloadManagementStatsIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(TestClusterUpdatePlugin.class); + return plugins; + } + + public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { + HashSet set = new HashSet<>(); + set.add(_ALL); + WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(null, set, null); + WlmStatsResponse response = client().execute(WlmStatsAction.INSTANCE, wlmStatsRequest).get(); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); + } + + public void testBasicWlmStats() throws Exception { + QueryGroup queryGroup = new QueryGroup( + "name", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { _ALL }, null); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + updateQueryGroupInClusterState(DELETE, queryGroup); + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { _ALL }, null); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); + } + + public void testWlmStatsWithId() throws Exception { + QueryGroup queryGroup = new QueryGroup( + "name", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { id }, null); + validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP }, null); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); + + QueryGroup queryGroup2 = new QueryGroup( + "name2", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) + ); + String id2 = queryGroup2.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup2); + WlmStatsResponse response3 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id2 }, null); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); + + WlmStatsResponse response4 = getWlmStatsResponse(new String[] { "erfgrdgbhn" }, null); + validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, "erfgrdgbhn" }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + updateQueryGroupInClusterState(DELETE, queryGroup2); + } + + public void testWlmStatsWithBreach() throws Exception { + QueryGroup queryGroup = new QueryGroup( + "name", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { _ALL }, true); + validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); + + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { _ALL }, false); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + WlmStatsResponse response3 = getWlmStatsResponse(new String[] { _ALL }, null); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + + public void testWlmStatsWithIdAndBreach() throws Exception { + QueryGroup queryGroup = new QueryGroup( + "name", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP }, true); + validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); + + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id }, true); + validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, id }); + + WlmStatsResponse response3 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id }, false); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + WlmStatsResponse response4 = getWlmStatsResponse(new String[] { id }, false); + validateResponse(response4, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + + public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException { + TestClusterUpdateListener listener = new TestClusterUpdateListener(); + client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + assertEquals(0, listener.latch.getCount()); + } + + public WlmStatsResponse getWlmStatsResponse(String[] queryGroupIds, Boolean breach) throws ExecutionException, InterruptedException { + WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(null, new HashSet<>(Arrays.asList(queryGroupIds)), breach); + return client().execute(WlmStatsAction.INSTANCE, wlmStatsRequest).get(); + } + + public void validateResponse(WlmStatsResponse response, String[] validIds, String[] invalidIds) { + assertNotNull(response.toString()); + String res = response.toString(); + if (validIds != null) { + for (String validId : validIds) { + assertTrue(res.contains(validId)); + } + } + if (invalidIds != null) { + for (String invalidId : invalidIds) { + assertFalse(res.contains(invalidId)); + } + } + } + + private static class TestClusterUpdateListener implements ActionListener { + private final CountDownLatch latch; + private Exception exception = null; + + public TestClusterUpdateListener() { + this.latch = new CountDownLatch(1); + } + + @Override + public void onResponse(TestClusterUpdateResponse r) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + this.exception = e; + latch.countDown(); + } + } + + public static class TestClusterUpdateRequest extends ActionRequest { + final private String method; + final private QueryGroup queryGroup; + + public TestClusterUpdateRequest(QueryGroup queryGroup, String method) { + this.method = method; + this.queryGroup = queryGroup; + } + + public TestClusterUpdateRequest(StreamInput in) throws IOException { + super(in); + this.method = in.readString(); + this.queryGroup = new QueryGroup(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(method); + queryGroup.writeTo(out); + } + + public QueryGroup getQueryGroup() { + return queryGroup; + } + + public String getMethod() { + return method; + } + } + + public static class TestClusterUpdateResponse extends ActionResponse { + public TestClusterUpdateResponse() {} + + public TestClusterUpdateResponse(StreamInput in) {} + + @Override + public void writeTo(StreamOutput out) throws IOException {} + } + + public static class TestClusterUpdateTransportAction extends HandledTransportAction< + TestClusterUpdateRequest, + TestClusterUpdateResponse> { + public static final ActionType ACTION = new ActionType<>( + "internal::test_cluster_update_action", + TestClusterUpdateResponse::new + ); + private final ClusterService clusterService; + + @Inject + public TestClusterUpdateTransportAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters + ) { + super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new); + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { + clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + Map currentGroups = currentState.metadata().queryGroups(); + QueryGroup queryGroup = request.getQueryGroup(); + String id = queryGroup.get_id(); + String method = request.getMethod(); + Metadata metadata; + if (method.equals(PUT)) { // create + metadata = Metadata.builder(currentState.metadata()).put(queryGroup).build(); + } else { // delete + metadata = Metadata.builder(currentState.metadata()).remove(currentGroups.get(id)).build(); + } + return ClusterState.builder(currentState).metadata(metadata).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new TestClusterUpdateResponse()); + } + }); + } + } + + public static class TestClusterUpdatePlugin extends Plugin implements ActionPlugin { + @Override + public List> getActions() { + return Arrays.asList(new ActionHandler<>(TestClusterUpdateTransportAction.ACTION, TestClusterUpdateTransportAction.class)); + } + + @Override + public List> getClientActions() { + return Arrays.asList(TestClusterUpdateTransportAction.ACTION); + } + } +} diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 14002a2b38134..39af136ceae1e 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -232,6 +232,9 @@ public QueryGroupStats nodeStats(Set queryGroupIds, Boolean requestedBre * @return if the QueryGroup breaches any resource limit based on the LastRecordedUsage */ public boolean resourceLimitBreached(String id, QueryGroupState currentState) { + if (id.equals("DEFAULT_QUERY_GROUP")) { + return false; + } QueryGroup queryGroup = clusterService.state().metadata().queryGroups().get(id); if (queryGroup == null) { throw new ResourceNotFoundException("QueryGroup with id " + id + " does not exist"); From f56fa0192a72cd6b906fc171ebfbacf1120808c5 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Tue, 15 Oct 2024 16:02:50 -0700 Subject: [PATCH 2/5] spotlessapply Signed-off-by: Ruirui Zhang --- .../backpressure/SearchBackpressureIT.java | 6 ++- .../wlm/WorkloadManagementStatsIT.java | 52 ++++--------------- 2 files changed, 15 insertions(+), 43 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 40c9301ef4bce..9e6b4e5e2f21a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -314,7 +314,7 @@ public void testSearchCancellationWithBackpressureDisabled() throws InterruptedE assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException); } - private static class ExceptionCatchingListener implements ActionListener { + public static class ExceptionCatchingListener implements ActionListener { private final CountDownLatch latch; private Exception exception = null; @@ -336,6 +336,10 @@ public void onFailure(Exception e) { private Exception getException() { return exception; } + + public CountDownLatch getLatch() { + return latch; + } } enum RequestType { diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java index 01573c312fa25..bb33dc9198cd4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -9,6 +9,7 @@ package org.opensearch.wlm; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.ActionType; @@ -31,6 +32,8 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.search.backpressure.SearchBackpressureIT.ExceptionCatchingListener; +import org.opensearch.search.backpressure.SearchBackpressureIT.TestResponse; import org.opensearch.tasks.Task; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; @@ -43,7 +46,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -170,10 +172,10 @@ public void testWlmStatsWithIdAndBreach() throws Exception { } public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException { - TestClusterUpdateListener listener = new TestClusterUpdateListener(); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener); - assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); - assertEquals(0, listener.latch.getCount()); + assertTrue(listener.getLatch().await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); + assertEquals(0, listener.getLatch().getCount()); } public WlmStatsResponse getWlmStatsResponse(String[] queryGroupIds, Boolean breach) throws ExecutionException, InterruptedException { @@ -196,26 +198,6 @@ public void validateResponse(WlmStatsResponse response, String[] validIds, Strin } } - private static class TestClusterUpdateListener implements ActionListener { - private final CountDownLatch latch; - private Exception exception = null; - - public TestClusterUpdateListener() { - this.latch = new CountDownLatch(1); - } - - @Override - public void onResponse(TestClusterUpdateResponse r) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - this.exception = e; - latch.countDown(); - } - } - public static class TestClusterUpdateRequest extends ActionRequest { final private String method; final private QueryGroup queryGroup; @@ -252,22 +234,8 @@ public String getMethod() { } } - public static class TestClusterUpdateResponse extends ActionResponse { - public TestClusterUpdateResponse() {} - - public TestClusterUpdateResponse(StreamInput in) {} - - @Override - public void writeTo(StreamOutput out) throws IOException {} - } - - public static class TestClusterUpdateTransportAction extends HandledTransportAction< - TestClusterUpdateRequest, - TestClusterUpdateResponse> { - public static final ActionType ACTION = new ActionType<>( - "internal::test_cluster_update_action", - TestClusterUpdateResponse::new - ); + public static class TestClusterUpdateTransportAction extends HandledTransportAction { + public static final ActionType ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new); private final ClusterService clusterService; @Inject @@ -281,7 +249,7 @@ public TestClusterUpdateTransportAction( } @Override - protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { + protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -305,7 +273,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new TestClusterUpdateResponse()); + listener.onResponse(new TestResponse()); } }); } From 0d635c7fae44a696bf9c51e809eaa1f3ccb70f02 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Fri, 18 Oct 2024 16:26:40 -0700 Subject: [PATCH 3/5] add more tests Signed-off-by: Ruirui Zhang --- .../wlm/WorkloadManagementStatsIT.java | 84 ++++++++++++------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java index bb33dc9198cd4..576c36923ff0f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -57,6 +57,9 @@ public class WorkloadManagementStatsIT extends ParameterizedStaticSettingsOpenSe final static String DELETE = "DELETE"; final static String DEFAULT_QUERY_GROUP = "DEFAULT_QUERY_GROUP"; final static String _ALL = "_all"; + final static String NAME1 = "name1"; + final static String NAME2 = "name2"; + final static String INVALID_ID = "invalid_id"; private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); public WorkloadManagementStatsIT(Settings nodeSettings) { @@ -79,52 +82,49 @@ protected Collection> nodePlugins() { } public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { - HashSet set = new HashSet<>(); - set.add(_ALL); - WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(null, set, null); - WlmStatsResponse response = client().execute(WlmStatsAction.INSTANCE, wlmStatsRequest).get(); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); } public void testBasicWlmStats() throws Exception { QueryGroup queryGroup = new QueryGroup( - "name", + NAME1, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); String id = queryGroup.get_id(); updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { _ALL }, null); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); updateQueryGroupInClusterState(DELETE, queryGroup); - WlmStatsResponse response2 = getWlmStatsResponse(new String[] { _ALL }, null); + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, null); validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); } - public void testWlmStatsWithId() throws Exception { + public void testWlmStatsWithQueryGroupId() throws Exception { QueryGroup queryGroup = new QueryGroup( - "name", + NAME1, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); String id = queryGroup.get_id(); updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { id }, null); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { id }, null); validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); - WlmStatsResponse response2 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP }, null); + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); QueryGroup queryGroup2 = new QueryGroup( - "name2", + NAME2, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) ); String id2 = queryGroup2.get_id(); updateQueryGroupInClusterState(PUT, queryGroup2); - WlmStatsResponse response3 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id2 }, null); + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, id2 }, null); validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); - WlmStatsResponse response4 = getWlmStatsResponse(new String[] { "erfgrdgbhn" }, null); - validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, "erfgrdgbhn" }); + WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); + validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, INVALID_ID }); updateQueryGroupInClusterState(DELETE, queryGroup); updateQueryGroupInClusterState(DELETE, queryGroup2); @@ -132,41 +132,62 @@ public void testWlmStatsWithId() throws Exception { public void testWlmStatsWithBreach() throws Exception { QueryGroup queryGroup = new QueryGroup( - "name", + NAME1, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); String id = queryGroup.get_id(); updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { _ALL }, true); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, true); validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); - WlmStatsResponse response2 = getWlmStatsResponse(new String[] { _ALL }, false); + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, false); validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); - WlmStatsResponse response3 = getWlmStatsResponse(new String[] { _ALL }, null); + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { _ALL }, null); validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); updateQueryGroupInClusterState(DELETE, queryGroup); } + public void testWlmStatsWithNodesId() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String queryGroupId = queryGroup.get_id(); + String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); + validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); + validateResponse(response2, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, new String[] { INVALID_ID }); + + WlmStatsResponse response3 = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); + validateResponse(response3, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + public void testWlmStatsWithIdAndBreach() throws Exception { QueryGroup queryGroup = new QueryGroup( - "name", + NAME1, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); - String id = queryGroup.get_id(); + String queryGroupId = queryGroup.get_id(); + String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP }, true); - validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); + WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); + validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); - WlmStatsResponse response2 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id }, true); - validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, id }); + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, true); + validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); - WlmStatsResponse response3 = getWlmStatsResponse(new String[] { DEFAULT_QUERY_GROUP, id }, false); - validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, false); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, null); - WlmStatsResponse response4 = getWlmStatsResponse(new String[] { id }, false); - validateResponse(response4, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { queryGroupId }, false); + validateResponse(response4, new String[] { queryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); updateQueryGroupInClusterState(DELETE, queryGroup); } @@ -178,8 +199,9 @@ public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) assertEquals(0, listener.getLatch().getCount()); } - public WlmStatsResponse getWlmStatsResponse(String[] queryGroupIds, Boolean breach) throws ExecutionException, InterruptedException { - WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(null, new HashSet<>(Arrays.asList(queryGroupIds)), breach); + public WlmStatsResponse getWlmStatsResponse(String[] nodesId, String[] queryGroupIds, Boolean breach) throws ExecutionException, + InterruptedException { + WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(nodesId, new HashSet<>(Arrays.asList(queryGroupIds)), breach); return client().execute(WlmStatsAction.INSTANCE, wlmStatsRequest).get(); } From 71b6598211a2f8e9b63582ed937372f66c10f80b Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Wed, 23 Oct 2024 15:24:36 -0700 Subject: [PATCH 4/5] address comments Signed-off-by: Ruirui Zhang --- .../wlm/WorkloadManagementStatsIT.java | 239 ++++++++++-------- .../org/opensearch/wlm/QueryGroupService.java | 4 + 2 files changed, 135 insertions(+), 108 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java index 576c36923ff0f..d524dd6e13397 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -38,14 +38,10 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.transport.TransportService; +import org.opensearch.wlm.stats.QueryGroupState; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -81,120 +77,132 @@ protected Collection> nodePlugins() { return plugins; } - public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { - WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); - validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); - } - - public void testBasicWlmStats() throws Exception { - QueryGroup queryGroup = new QueryGroup( - NAME1, - new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) - ); - String id = queryGroup.get_id(); - updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); - validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); - - updateQueryGroupInClusterState(DELETE, queryGroup); - WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, null); - validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); - } - - public void testWlmStatsWithQueryGroupId() throws Exception { - QueryGroup queryGroup = new QueryGroup( - NAME1, - new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) - ); - String id = queryGroup.get_id(); - updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(null, new String[] { id }, null); - validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); - - WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); - validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); - - QueryGroup queryGroup2 = new QueryGroup( - NAME2, - new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) - ); - String id2 = queryGroup2.get_id(); - updateQueryGroupInClusterState(PUT, queryGroup2); - WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, id2 }, null); - validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); - - WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); - validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, INVALID_ID }); - - updateQueryGroupInClusterState(DELETE, queryGroup); - updateQueryGroupInClusterState(DELETE, queryGroup2); - } - - public void testWlmStatsWithBreach() throws Exception { - QueryGroup queryGroup = new QueryGroup( - NAME1, - new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) - ); - String id = queryGroup.get_id(); - updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, true); - validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); - - WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, false); - validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); - - WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { _ALL }, null); - validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); +// public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { +// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); +// validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); +// } +// +// public void testBasicWlmStats() throws Exception { +// QueryGroup queryGroup = new QueryGroup( +// NAME1, +// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) +// ); +// String id = queryGroup.get_id(); +// updateQueryGroupInClusterState(PUT, queryGroup); +// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); +// validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); +// +// updateQueryGroupInClusterState(DELETE, queryGroup); +// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, null); +// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); +// } +// +// public void testWlmStatsWithQueryGroupId() throws Exception { +// QueryGroup queryGroup = new QueryGroup( +// NAME1, +// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) +// ); +// String id = queryGroup.get_id(); +// updateQueryGroupInClusterState(PUT, queryGroup); +// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { id }, null); +// validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); +// +// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); +// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); +// +// QueryGroup queryGroup2 = new QueryGroup( +// NAME2, +// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) +// ); +// String id2 = queryGroup2.get_id(); +// updateQueryGroupInClusterState(PUT, queryGroup2); +// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, id2 }, null); +// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); +// +// WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); +// validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, INVALID_ID }); +// +// updateQueryGroupInClusterState(DELETE, queryGroup); +// updateQueryGroupInClusterState(DELETE, queryGroup2); +// } +// +// public void testWlmStatsWithBreach() throws Exception { +// QueryGroup queryGroup = new QueryGroup( +// NAME1, +// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) +// ); +// String id = queryGroup.get_id(); +// updateQueryGroupInClusterState(PUT, queryGroup); +// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, true); +// validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); +// +// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, false); +// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); +// +// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { _ALL }, null); +// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); +// +// updateQueryGroupInClusterState(DELETE, queryGroup); +// } +// +// public void testWlmStatsWithNodesId() throws Exception { +// QueryGroup queryGroup = new QueryGroup( +// NAME1, +// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) +// ); +// String queryGroupId = queryGroup.get_id(); +// String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); +// updateQueryGroupInClusterState(PUT, queryGroup); +// WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); +// validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); +// +// WlmStatsResponse response2 = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); +// validateResponse(response2, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, new String[] { INVALID_ID }); +// +// WlmStatsResponse response3 = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); +// validateResponse(response3, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); +// +// updateQueryGroupInClusterState(DELETE, queryGroup); +// } - updateQueryGroupInClusterState(DELETE, queryGroup); - } - - public void testWlmStatsWithNodesId() throws Exception { + public void testWlmStatsWithIdAndBreach() throws Exception { QueryGroup queryGroup = new QueryGroup( NAME1, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); String queryGroupId = queryGroup.get_id(); String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); - updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); - validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + updateQueryGroupInClusterState(PUT, queryGroup, new HashMap<>()); - WlmStatsResponse response2 = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); - validateResponse(response2, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, new String[] { INVALID_ID }); - - WlmStatsResponse response3 = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); - validateResponse(response3, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); - - updateQueryGroupInClusterState(DELETE, queryGroup); - } - - public void testWlmStatsWithIdAndBreach() throws Exception { - QueryGroup queryGroup = new QueryGroup( - NAME1, + QueryGroup queryGroup2 = new QueryGroup( + NAME2, new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) ); - String queryGroupId = queryGroup.get_id(); - String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); - updateQueryGroupInClusterState(PUT, queryGroup); - WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); - validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); - - WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, true); - validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); - - WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, false); - validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, null); - - WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { queryGroupId }, false); - validateResponse(response4, new String[] { queryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); - - updateQueryGroupInClusterState(DELETE, queryGroup); + String queryGroupId2 = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup2, Map.of(ResourceType.CPU, 0.6)); + +// WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); +// validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); +// +// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, true); +// validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); +// +// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, false); +// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, null); +// +// WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { queryGroupId }, false); +// validateResponse(response4, new String[] { queryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); + + WlmStatsResponse response5 = getWlmStatsResponse(null, new String[] { queryGroupId, queryGroupId2 }, true); + validateResponse(response5, new String[] { queryGroupId2 }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + updateQueryGroupInClusterState(DELETE, queryGroup, new HashMap<>()); + updateQueryGroupInClusterState(DELETE, queryGroup2, new HashMap<>()); } - public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException { + public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup, Map lastRecordedUsageMap) throws InterruptedException { ExceptionCatchingListener listener = new ExceptionCatchingListener(); - client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener); + client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method, lastRecordedUsageMap), listener); assertTrue(listener.getLatch().await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); assertEquals(0, listener.getLatch().getCount()); } @@ -223,16 +231,19 @@ public void validateResponse(WlmStatsResponse response, String[] validIds, Strin public static class TestClusterUpdateRequest extends ActionRequest { final private String method; final private QueryGroup queryGroup; + final private Map lastRecordedUsageMap; - public TestClusterUpdateRequest(QueryGroup queryGroup, String method) { + public TestClusterUpdateRequest(QueryGroup queryGroup, String method, Map lastRecordedUsageMap) { this.method = method; this.queryGroup = queryGroup; + this.lastRecordedUsageMap = lastRecordedUsageMap; } public TestClusterUpdateRequest(StreamInput in) throws IOException { super(in); this.method = in.readString(); this.queryGroup = new QueryGroup(in); + this.lastRecordedUsageMap = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble); } @Override @@ -245,6 +256,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(method); queryGroup.writeTo(out); + out.writeMap(lastRecordedUsageMap, ResourceType::writeTo, StreamOutput::writeDouble); } public QueryGroup getQueryGroup() { @@ -254,20 +266,27 @@ public QueryGroup getQueryGroup() { public String getMethod() { return method; } + + public Map getLastRecordedUsageMap() { + return lastRecordedUsageMap; + } } public static class TestClusterUpdateTransportAction extends HandledTransportAction { public static final ActionType ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new); private final ClusterService clusterService; + private final QueryGroupService queryGroupService; @Inject public TestClusterUpdateTransportAction( TransportService transportService, ClusterService clusterService, + QueryGroupService service, ActionFilters actionFilters ) { super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new); this.clusterService = clusterService; + this.queryGroupService = service; } @Override @@ -298,6 +317,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS listener.onResponse(new TestResponse()); } }); + QueryGroupState state = queryGroupService.getQueryGroupsStateAccessor().getQueryGroupState(request.getQueryGroup().get_id()); + for (Map.Entry entry: request.getLastRecordedUsageMap().entrySet()) { + state.getResourceState().get(entry.getKey()).setLastRecordedUsage(entry.getValue()); + } } } diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 39af136ceae1e..1fac8243e7865 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -326,6 +326,10 @@ public Set getDeletedQueryGroups() { return deletedQueryGroups; } + public QueryGroupsStateAccessor getQueryGroupsStateAccessor() { + return queryGroupsStateAccessor; + } + /** * This method determines whether the task should be accounted by SBP if both features co-exist * @param t QueryGroupTask From adb1c1a88bc2788381bf38387a97599d75fa766e Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Wed, 23 Oct 2024 17:54:03 -0700 Subject: [PATCH 5/5] address comments Signed-off-by: Ruirui Zhang --- .../wlm/action/UpdateQueryGroupRequest.java | 1 + .../backpressure/SearchBackpressureIT.java | 4 +- .../wlm/WorkloadManagementStatsIT.java | 432 ++++++++++++------ .../org/opensearch/wlm/QueryGroupService.java | 14 +- 4 files changed, 310 insertions(+), 141 deletions(-) diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java index 18af58289be13..367c27c7a5ea3 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/UpdateQueryGroupRequest.java @@ -8,6 +8,7 @@ package org.opensearch.plugin.wlm.action; +import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.cluster.metadata.QueryGroup; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 9e6b4e5e2f21a..00deca1f97cff 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -333,7 +333,7 @@ public void onFailure(Exception e) { latch.countDown(); } - private Exception getException() { + public Exception getException() { return exception; } @@ -353,7 +353,7 @@ private Supplier descriptionSupplier(String description) { return () -> description; } - interface TaskFactory { + public interface TaskFactory { T createTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers); } diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java index d524dd6e13397..acb09a8826b6f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementStatsIT.java @@ -16,10 +16,16 @@ import org.opensearch.action.admin.cluster.wlm.WlmStatsAction; import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest; import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse; +import org.opensearch.action.search.SearchTask; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.service.ClusterService; @@ -30,33 +36,44 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.tasks.TaskCancelledException; +import org.opensearch.core.tasks.TaskId; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.search.backpressure.SearchBackpressureIT.ExceptionCatchingListener; +import org.opensearch.search.backpressure.SearchBackpressureIT.TaskFactory; import org.opensearch.search.backpressure.SearchBackpressureIT.TestResponse; +import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; -import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.opensearch.wlm.stats.QueryGroupState; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.threadpool.ThreadPool.Names.SAME; +import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; -@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) public class WorkloadManagementStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { final static String PUT = "PUT"; final static String DELETE = "DELETE"; final static String DEFAULT_QUERY_GROUP = "DEFAULT_QUERY_GROUP"; final static String _ALL = "_all"; + final static String CPU = "CPU"; + final static String MEMORY = "MEMORY"; final static String NAME1 = "name1"; final static String NAME2 = "name2"; final static String INVALID_ID = "invalid_id"; - private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = new TimeValue(3, TimeUnit.SECONDS); public WorkloadManagementStatsIT(Settings nodeSettings) { super(nodeSettings); @@ -77,93 +94,98 @@ protected Collection> nodePlugins() { return plugins; } -// public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { -// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); -// validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); -// } -// -// public void testBasicWlmStats() throws Exception { -// QueryGroup queryGroup = new QueryGroup( -// NAME1, -// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) -// ); -// String id = queryGroup.get_id(); -// updateQueryGroupInClusterState(PUT, queryGroup); -// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); -// validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); -// -// updateQueryGroupInClusterState(DELETE, queryGroup); -// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, null); -// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); -// } -// -// public void testWlmStatsWithQueryGroupId() throws Exception { -// QueryGroup queryGroup = new QueryGroup( -// NAME1, -// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) -// ); -// String id = queryGroup.get_id(); -// updateQueryGroupInClusterState(PUT, queryGroup); -// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { id }, null); -// validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); -// -// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); -// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); -// -// QueryGroup queryGroup2 = new QueryGroup( -// NAME2, -// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) -// ); -// String id2 = queryGroup2.get_id(); -// updateQueryGroupInClusterState(PUT, queryGroup2); -// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, id2 }, null); -// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); -// -// WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); -// validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, INVALID_ID }); -// -// updateQueryGroupInClusterState(DELETE, queryGroup); -// updateQueryGroupInClusterState(DELETE, queryGroup2); -// } -// -// public void testWlmStatsWithBreach() throws Exception { -// QueryGroup queryGroup = new QueryGroup( -// NAME1, -// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) -// ); -// String id = queryGroup.get_id(); -// updateQueryGroupInClusterState(PUT, queryGroup); -// WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, true); -// validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); -// -// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, false); -// validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); -// -// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { _ALL }, null); -// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); -// -// updateQueryGroupInClusterState(DELETE, queryGroup); -// } -// -// public void testWlmStatsWithNodesId() throws Exception { -// QueryGroup queryGroup = new QueryGroup( -// NAME1, -// new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) -// ); -// String queryGroupId = queryGroup.get_id(); -// String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); -// updateQueryGroupInClusterState(PUT, queryGroup); -// WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); -// validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); -// -// WlmStatsResponse response2 = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); -// validateResponse(response2, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, new String[] { INVALID_ID }); -// -// WlmStatsResponse response3 = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); -// validateResponse(response3, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); -// -// updateQueryGroupInClusterState(DELETE, queryGroup); -// } + public void testDefaultQueryGroup() throws ExecutionException, InterruptedException { + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP }, null); + } + + public void testBasicWlmStats() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + updateQueryGroupInClusterState(DELETE, queryGroup); + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); + } + + public void testWlmStatsWithQueryGroupId() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { id }, null); + validateResponse(response, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP }, null); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP }, new String[] { id }); + + QueryGroup queryGroup2 = new QueryGroup( + NAME2, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.2)) + ); + String id2 = queryGroup2.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup2); + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, id2 }, null); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id2 }, new String[] { id }); + + WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { INVALID_ID }, null); + validateResponse(response4, null, new String[] { DEFAULT_QUERY_GROUP, id, id2, INVALID_ID }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + updateQueryGroupInClusterState(DELETE, queryGroup2); + } + + public void testWlmStatsWithBreach() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.0000001)) + ); + String id = queryGroup.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(null, new String[] { _ALL }, true); + validateResponse(response, null, new String[] { DEFAULT_QUERY_GROUP, id }); + + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { _ALL }, false); + validateResponse(response2, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { _ALL }, null); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, id }, null); + + executeQueryGroupTask(MEMORY, id); + Thread.sleep(1000); + WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { _ALL }, true); + validateResponse(response4, new String[] { id }, new String[] { DEFAULT_QUERY_GROUP }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } + + public void testWlmStatsWithNodesId() throws Exception { + QueryGroup queryGroup = new QueryGroup( + NAME1, + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + ); + String queryGroupId = queryGroup.get_id(); + String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); + updateQueryGroupInClusterState(PUT, queryGroup); + WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { _ALL }, true); + validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + WlmStatsResponse response2 = getWlmStatsResponse(new String[] { nodeId, INVALID_ID }, new String[] { _ALL }, false); + validateResponse(response2, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }, new String[] { INVALID_ID }); + + WlmStatsResponse response3 = getWlmStatsResponse(new String[] { INVALID_ID }, new String[] { _ALL }, false); + validateResponse(response3, null, new String[] { nodeId, DEFAULT_QUERY_GROUP, queryGroupId }); + + updateQueryGroupInClusterState(DELETE, queryGroup); + } public void testWlmStatsWithIdAndBreach() throws Exception { QueryGroup queryGroup = new QueryGroup( @@ -172,37 +194,39 @@ public void testWlmStatsWithIdAndBreach() throws Exception { ); String queryGroupId = queryGroup.get_id(); String nodeId = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes().getLocalNodeId(); - updateQueryGroupInClusterState(PUT, queryGroup, new HashMap<>()); + updateQueryGroupInClusterState(PUT, queryGroup); QueryGroup queryGroup2 = new QueryGroup( NAME2, - new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.5)) + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.000000001)) ); - String queryGroupId2 = queryGroup.get_id(); - updateQueryGroupInClusterState(PUT, queryGroup2, Map.of(ResourceType.CPU, 0.6)); - -// WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); -// validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); -// -// WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, true); -// validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); -// -// WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, false); -// validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, null); -// -// WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { queryGroupId }, false); -// validateResponse(response4, new String[] { queryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); + String queryGroupId2 = queryGroup2.get_id(); + updateQueryGroupInClusterState(PUT, queryGroup2); + + WlmStatsResponse response = getWlmStatsResponse(new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP }, true); + validateResponse(response, new String[] { nodeId }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + WlmStatsResponse response2 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, true); + validateResponse(response2, null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); + + WlmStatsResponse response3 = getWlmStatsResponse(null, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, false); + validateResponse(response3, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }, null); + WlmStatsResponse response4 = getWlmStatsResponse(null, new String[] { queryGroupId }, false); + validateResponse(response4, new String[] { queryGroupId }, new String[] { DEFAULT_QUERY_GROUP }); + + executeQueryGroupTask(CPU, queryGroupId2); + Thread.sleep(1000); WlmStatsResponse response5 = getWlmStatsResponse(null, new String[] { queryGroupId, queryGroupId2 }, true); validateResponse(response5, new String[] { queryGroupId2 }, new String[] { DEFAULT_QUERY_GROUP, queryGroupId }); - updateQueryGroupInClusterState(DELETE, queryGroup, new HashMap<>()); - updateQueryGroupInClusterState(DELETE, queryGroup2, new HashMap<>()); + updateQueryGroupInClusterState(DELETE, queryGroup); + updateQueryGroupInClusterState(DELETE, queryGroup2); } - public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup, Map lastRecordedUsageMap) throws InterruptedException { + public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) throws InterruptedException { ExceptionCatchingListener listener = new ExceptionCatchingListener(); - client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method, lastRecordedUsageMap), listener); + client().execute(TestClusterUpdateTransportAction.ACTION, new TestClusterUpdateRequest(queryGroup, method), listener); assertTrue(listener.getLatch().await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); assertEquals(0, listener.getLatch().getCount()); } @@ -228,22 +252,19 @@ public void validateResponse(WlmStatsResponse response, String[] validIds, Strin } } - public static class TestClusterUpdateRequest extends ActionRequest { + public static class TestClusterUpdateRequest extends ClusterManagerNodeRequest { final private String method; final private QueryGroup queryGroup; - final private Map lastRecordedUsageMap; - public TestClusterUpdateRequest(QueryGroup queryGroup, String method, Map lastRecordedUsageMap) { + public TestClusterUpdateRequest(QueryGroup queryGroup, String method) { this.method = method; this.queryGroup = queryGroup; - this.lastRecordedUsageMap = lastRecordedUsageMap; } public TestClusterUpdateRequest(StreamInput in) throws IOException { super(in); this.method = in.readString(); this.queryGroup = new QueryGroup(in); - this.lastRecordedUsageMap = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readDouble); } @Override @@ -256,7 +277,6 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(method); queryGroup.writeTo(out); - out.writeMap(lastRecordedUsageMap, ResourceType::writeTo, StreamOutput::writeDouble); } public QueryGroup getQueryGroup() { @@ -266,31 +286,51 @@ public QueryGroup getQueryGroup() { public String getMethod() { return method; } - - public Map getLastRecordedUsageMap() { - return lastRecordedUsageMap; - } } - public static class TestClusterUpdateTransportAction extends HandledTransportAction { + public static class TestClusterUpdateTransportAction extends TransportClusterManagerNodeAction { public static final ActionType ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new); - private final ClusterService clusterService; - private final QueryGroupService queryGroupService; @Inject public TestClusterUpdateTransportAction( + ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, - QueryGroupService service, - ActionFilters actionFilters + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { - super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new); - this.clusterService = clusterService; - this.queryGroupService = service; + super( + ACTION.name(), + transportService, + clusterService, + threadPool, + actionFilters, + TestClusterUpdateRequest::new, + indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return SAME; } @Override - protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { + protected TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TestClusterUpdateRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void clusterManagerOperation( + TestClusterUpdateRequest request, + ClusterState clusterState, + ActionListener listener + ) { clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -317,22 +357,140 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS listener.onResponse(new TestResponse()); } }); - QueryGroupState state = queryGroupService.getQueryGroupsStateAccessor().getQueryGroupState(request.getQueryGroup().get_id()); - for (Map.Entry entry: request.getLastRecordedUsageMap().entrySet()) { - state.getResourceState().get(entry.getKey()).setLastRecordedUsage(entry.getValue()); + } + } + + public static class TestQueryGroupTaskRequest extends ActionRequest { + private final String type; + private final String queryGroupId; + private TaskFactory taskFactory; + + public TestQueryGroupTaskRequest(String type, String queryGroupId, TaskFactory taskFactory) { + this.type = type; + this.queryGroupId = queryGroupId; + this.taskFactory = taskFactory; + } + + public TestQueryGroupTaskRequest(StreamInput in) throws IOException { + super(in); + this.type = in.readString(); + this.queryGroupId = in.readString(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return taskFactory.createTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(type); + out.writeString(queryGroupId); + } + + public String getType() { + return type; + } + + public String getQueryGroupId() { + return queryGroupId; + } + } + + public static class TestQueryGroupTaskTransportAction extends HandledTransportAction { + public static final ActionType ACTION = new ActionType<>("internal::test_query_group_task_action", TestResponse::new); + private final ThreadPool threadPool; + + @Inject + public TestQueryGroupTaskTransportAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters) { + super(ACTION.name(), transportService, actionFilters, TestQueryGroupTaskRequest::new); + this.threadPool = threadPool; + } + + @Override + protected void doExecute(Task task, TestQueryGroupTaskRequest request, ActionListener listener) { + threadPool.getThreadContext().putHeader(QUERY_GROUP_ID_HEADER, request.getQueryGroupId()); + threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> { + try { + CancellableTask cancellableTask = (CancellableTask) task; + ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); + assertEquals(request.getQueryGroupId(), ((QueryGroupTask) task).getQueryGroupId()); + long startTime = System.nanoTime(); + while (System.nanoTime() - startTime < TIMEOUT.getNanos()) { + doWork(request); + if (cancellableTask.isCancelled()) { + break; + } + } + if (cancellableTask.isCancelled()) { + throw new TaskCancelledException(cancellableTask.getReasonCancelled()); + } else { + listener.onResponse(new TestResponse()); + } + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + private void doWork(TestQueryGroupTaskRequest request) throws InterruptedException { + switch (request.getType()) { + case CPU: + long i = 0, j = 1, k = 1, iterations = 1000; + do { + j += i; + k *= j; + i++; + } while (i < iterations); + break; + case MEMORY: + int bytesToAllocate = (int) (Runtime.getRuntime().totalMemory() * 0.01); + Byte[] bytes = new Byte[bytesToAllocate]; + int[] ints = new int[bytesToAllocate]; + break; } } } + public Exception executeQueryGroupTask(String resourceType, String queryGroupId) throws InterruptedException { + ExceptionCatchingListener listener = new ExceptionCatchingListener(); + client().execute( + TestQueryGroupTaskTransportAction.ACTION, + new TestQueryGroupTaskRequest( + resourceType, + queryGroupId, + (TaskFactory) (id, type, action, description, parentTaskId, headers) -> new SearchTask( + id, + type, + action, + () -> description, + parentTaskId, + headers + ) + ), + listener + ); + return listener.getException(); + } + public static class TestClusterUpdatePlugin extends Plugin implements ActionPlugin { @Override public List> getActions() { - return Arrays.asList(new ActionHandler<>(TestClusterUpdateTransportAction.ACTION, TestClusterUpdateTransportAction.class)); + return Arrays.asList( + new ActionHandler<>(TestClusterUpdateTransportAction.ACTION, TestClusterUpdateTransportAction.class), + new ActionHandler<>(TestQueryGroupTaskTransportAction.ACTION, TestQueryGroupTaskTransportAction.class) + ); } @Override public List> getClientActions() { - return Arrays.asList(TestClusterUpdateTransportAction.ACTION); + return Arrays.asList(TestClusterUpdateTransportAction.ACTION, TestQueryGroupTaskTransportAction.ACTION); } } } diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 1fac8243e7865..ca039ebfc7883 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -216,14 +216,24 @@ public QueryGroupStats nodeStats(Set queryGroupIds, Boolean requestedBre } } if (existingStateMap != null) { - existingStateMap.forEach((queryGroupId, currentState) -> { + for (Map.Entry entry : existingStateMap.entrySet()) { + String queryGroupId = entry.getKey(); + QueryGroupState currentState = entry.getValue(); boolean shouldInclude = queryGroupIds.contains("_all") || queryGroupIds.contains(queryGroupId); if (shouldInclude) { if (requestedBreached == null || requestedBreached == resourceLimitBreached(queryGroupId, currentState)) { statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState)); } } - }); + } + // existingStateMap.forEach((queryGroupId, currentState) -> { + // boolean shouldInclude = queryGroupIds.contains("_all") || queryGroupIds.contains(queryGroupId); + // if (shouldInclude) { + // if (requestedBreached == null || requestedBreached == resourceLimitBreached(queryGroupId, currentState)) { + // statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState)); + // } + // } + // }); } return new QueryGroupStats(statsHolderMap); }