Skip to content

Commit

Permalink
Add wlm resiliency orchestrator (query group service) (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#15925)

* cancellation related

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <[email protected]>

* add better cancellation reason

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellationTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskSelectionStrategy.java

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* refactor node level threshold

Signed-off-by: Kiran Prakash <[email protected]>

* use query group task

Signed-off-by: Kaushal Kumar <[email protected]>

* code clean up and refactorings

Signed-off-by: Kaushal Kumar <[email protected]>

* add unit tests and fix existing ones

Signed-off-by: Kaushal Kumar <[email protected]>

* uncomment the test case

Signed-off-by: Kaushal Kumar <[email protected]>

* update CHANGELOG

Signed-off-by: Kaushal Kumar <[email protected]>

* fix imports

Signed-off-by: Kaushal Kumar <[email protected]>

* add queryGroupService

Signed-off-by: Kaushal Kumar <[email protected]>

* refactor and add UTs for new constructs

Signed-off-by: Kaushal Kumar <[email protected]>

* fix javadocs

Signed-off-by: Kaushal Kumar <[email protected]>

* remove code clutter

Signed-off-by: Kaushal Kumar <[email protected]>

* change annotation version and task selection strategy

Signed-off-by: Kaushal Kumar <[email protected]>

* rename a util class

Signed-off-by: Kaushal Kumar <[email protected]>

* remove wrappers from resource type

Signed-off-by: Kaushal Kumar <[email protected]>

* apply spotless

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* add rename changes

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* initial changes

Signed-off-by: Kaushal Kumar <[email protected]>

* refactor changes and logical bug fix

Signed-off-by: Kaushal Kumar <[email protected]>

* add chanegs

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* temp changes

Signed-off-by: Kaushal Kumar <[email protected]>

* add UTs

Signed-off-by: Kaushal Kumar <[email protected]>

* add changelog

Signed-off-by: Kaushal Kumar <[email protected]>

* add task completion listener hook

Signed-off-by: Kaushal Kumar <[email protected]>

* add remaining pieces to make the feature functional

Signed-off-by: Kaushal Kumar <[email protected]>

* extend stats and fix bugs

Signed-off-by: Kaushal Kumar <[email protected]>

* fix bugs and add logic to make SBP work with wlm

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* fix bugs and SBP ITs

Signed-off-by: Kaushal Kumar <[email protected]>

* add missed applyCluster state change

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* decouple queryGroupService and cancellationService

Signed-off-by: Kaushal Kumar <[email protected]>

* replace StateApplier with StateListener interface

Signed-off-by: Kaushal Kumar <[email protected]>

* fix precommit errors

Signed-off-by: Kaushal Kumar <[email protected]>

---------

Signed-off-by: Kiran Prakash <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
Co-authored-by: Kiran Prakash <[email protected]>
  • Loading branch information
2 people authored and dk2k committed Oct 16, 2024
1 parent 57a0af6 commit a9a1b1e
Show file tree
Hide file tree
Showing 22 changed files with 1,410 additions and 117 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- [Workload Management] Add orchestrator for wlm resiliency (QueryGroupService) ([#15925](https://github.com/opensearch-project/OpenSearch/pull/15925))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupTask;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -411,6 +412,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> {
try {
CancellableTask cancellableTask = (CancellableTask) task;
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
long startTime = System.nanoTime();

// Doing a busy-wait until task cancellation or timeout.
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -565,7 +566,10 @@ public ActionModule(
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(QueryGroupTask.QUERY_GROUP_ID_HEADER, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,9 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.WLM_MODE_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_DURESS_STREAK_SETTING,

// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
Expand Down
39 changes: 35 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,13 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupsStateAccessor;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.cancellation.MaximumResourceTaskSelectionStrategy;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1022,8 +1027,30 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
final WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(
settings,
settingsModule.getClusterSettings()
);

final QueryGroupsStateAccessor queryGroupsStateAccessor = new QueryGroupsStateAccessor();

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService,
queryGroupsStateAccessor
),
clusterService,
threadPool,
workloadManagementSettings,
queryGroupsStateAccessor
);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
Expand Down Expand Up @@ -1089,7 +1116,7 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
queryGroupService
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down Expand Up @@ -1184,7 +1211,8 @@ protected Node(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool,
transportService.getTaskManager()
transportService.getTaskManager(),
queryGroupService
);

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
Expand Down Expand Up @@ -1396,6 +1424,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(QueryGroupService.class).toInstance(queryGroupService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
Expand Down Expand Up @@ -1589,6 +1618,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
injector.getInstance(QueryGroupService.class).start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1762,6 +1792,7 @@ private Node stop() {
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodeResourceUsageTracker.class).stop();
injector.getInstance(ResourceUsageCollectorService.class).stop();
injector.getInstance(QueryGroupService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.ResourceType;

import java.io.IOException;
Expand Down Expand Up @@ -86,12 +87,14 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem

private final Map<Class<? extends SearchBackpressureTask>, SearchBackpressureState> searchBackpressureStates;
private final TaskManager taskManager;
private final QueryGroupService queryGroupService;

public SearchBackpressureService(
SearchBackpressureSettings settings,
TaskResourceTrackingService taskResourceTrackingService,
ThreadPool threadPool,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
Expand Down Expand Up @@ -131,7 +134,8 @@ public SearchBackpressureService(
settings.getClusterSettings(),
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE
),
taskManager
taskManager,
queryGroupService
);
}

Expand All @@ -143,14 +147,16 @@ public SearchBackpressureService(
NodeDuressTrackers nodeDuressTrackers,
TaskResourceUsageTrackers searchTaskTrackers,
TaskResourceUsageTrackers searchShardTaskTrackers,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this.settings = settings;
this.taskResourceTrackingService = taskResourceTrackingService;
this.taskResourceTrackingService.addTaskCompletionListener(this);
this.threadPool = threadPool;
this.nodeDuressTrackers = nodeDuressTrackers;
this.taskManager = taskManager;
this.queryGroupService = queryGroupService;

this.searchBackpressureStates = Map.of(
SearchTask.class,
Expand Down Expand Up @@ -346,6 +352,7 @@ <T extends CancellableTask & SearchBackpressureTask> List<CancellableTask> getTa
.stream()
.filter(type::isInstance)
.map(type::cast)
.filter(queryGroupService::shouldSBPHandle)
.collect(Collectors.toUnmodifiableList());
}

Expand Down
Loading

0 comments on commit a9a1b1e

Please sign in to comment.