From 96eb9a379889c716f374f614f228a86599fccb04 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 14:19:37 -0700 Subject: [PATCH 1/7] Deprecate MasterService and create alternative ClusterManagerService Signed-off-by: Tianli Feng --- .../cluster/coordination/ZenDiscoveryIT.java | 4 +++- .../cluster/service/ClusterServiceIT.java | 6 ++--- .../health/TransportClusterHealthAction.java | 8 +++---- .../TransportPendingClusterTasksAction.java | 2 +- .../service/ClusterManagerService.java | 19 +++++++++++++++ .../cluster/service/ClusterService.java | 24 ++++++++++++------- .../main/java/org/opensearch/node/Node.java | 4 ++-- ...tAddVotingConfigExclusionsActionTests.java | 2 +- .../routing/BatchedRerouteServiceTests.java | 2 +- .../opensearch/test/ClusterServiceUtils.java | 5 ++-- .../BlockMasterServiceOnMaster.java | 2 +- .../BusyMasterServiceDisruption.java | 2 +- 12 files changed, 54 insertions(+), 26 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java index 66ff9eb15ae0e..4f47f2ecefd16 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/ZenDiscoveryIT.java @@ -170,7 +170,9 @@ public void testDiscoveryStats() throws Exception { ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed) assertBusy( () -> assertThat( - internalCluster().clusterService(internalCluster().getClusterManagerName()).getMasterService().numberOfPendingTasks(), + internalCluster().clusterService(internalCluster().getClusterManagerName()) + .getClusterManagerService() + .numberOfPendingTasks(), equalTo(0) ) ); // see https://github.com/elastic/elasticsearch/issues/24388 diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java index 252053f4968b4..7d9ffb23a2cf7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/service/ClusterServiceIT.java @@ -391,7 +391,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // The tasks can be re-ordered, so we need to check out-of-order Set controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")); - List pendingClusterTasks = clusterService.getMasterService().pendingTasks(); + List pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10)); assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1")); assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true)); @@ -413,7 +413,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS invoked2.await(); // whenever we test for no tasks, we need to wait since this is a live node - assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty())); + assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getClusterManagerService().pendingTasks().isEmpty())); waitNoPendingTasksOnAll(); final CountDownLatch block2 = new CountDownLatch(1); @@ -453,7 +453,7 @@ public void onFailure(String source, Exception e) { } Thread.sleep(100); - pendingClusterTasks = clusterService.getMasterService().pendingTasks(); + pendingClusterTasks = clusterService.getClusterManagerService().pendingTasks(); assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5)); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); for (PendingClusterTask task : pendingClusterTasks) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index a15fc15ef6a2b..ead6f1e9299ca 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -316,9 +316,9 @@ private boolean validateRequest(final ClusterHealthRequest request, ClusterState ClusterHealthResponse response = clusterHealth( request, clusterState, - clusterService.getMasterService().numberOfPendingTasks(), + clusterService.getClusterManagerService().numberOfPendingTasks(), allocationService.getNumberOfInFlightFetches(), - clusterService.getMasterService().getMaxTaskWaitTime() + clusterService.getClusterManagerService().getMaxTaskWaitTime() ); return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount; } @@ -338,9 +338,9 @@ private ClusterHealthResponse getResponse( ClusterHealthResponse response = clusterHealth( request, clusterState, - clusterService.getMasterService().numberOfPendingTasks(), + clusterService.getClusterManagerService().numberOfPendingTasks(), allocationService.getNumberOfInFlightFetches(), - clusterService.getMasterService().getMaxTaskWaitTime() + clusterService.getClusterManagerService().getMaxTaskWaitTime() ); int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver); boolean valid = (readyCounter == waitFor); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java index 8962f0395cc6f..c442cdd02e364 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/tasks/TransportPendingClusterTasksAction.java @@ -106,7 +106,7 @@ protected void masterOperation( ActionListener listener ) { logger.trace("fetching pending tasks from cluster service"); - final List pendingTasks = clusterService.getMasterService().pendingTasks(); + final List pendingTasks = clusterService.getClusterManagerService().pendingTasks(); logger.trace("done fetching pending tasks from cluster service"); listener.onResponse(new PendingClusterTasksResponse(pendingTasks)); } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java new file mode 100644 index 0000000000000..4eef515a904af --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; + +public class ClusterManagerService extends MasterService { + public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + super(settings, clusterSettings, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c58bb4d9a947c..19a4ccdc58542 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -63,7 +63,7 @@ * @opensearch.internal */ public class ClusterService extends AbstractLifecycleComponent { - private final MasterService masterService; + private final ClusterManagerService clusterManagerService; private final ClusterApplierService clusterApplierService; @@ -93,7 +93,7 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread this( settings, clusterSettings, - new MasterService(settings, clusterSettings, threadPool), + new ClusterManagerService(settings, clusterSettings, threadPool), new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool) ); } @@ -101,12 +101,12 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread public ClusterService( Settings settings, ClusterSettings clusterSettings, - MasterService masterService, + ClusterManagerService clusterManagerService, ClusterApplierService clusterApplierService ) { this.settings = settings; this.nodeName = Node.NODE_NAME_SETTING.get(settings); - this.masterService = masterService; + this.clusterManagerService = clusterManagerService; this.operationRouting = new OperationRouting(settings, clusterSettings); this.clusterSettings = clusterSettings; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); @@ -132,18 +132,18 @@ public RerouteService getRerouteService() { @Override protected synchronized void doStart() { clusterApplierService.start(); - masterService.start(); + clusterManagerService.start(); } @Override protected synchronized void doStop() { - masterService.stop(); + clusterManagerService.stop(); clusterApplierService.stop(); } @Override protected synchronized void doClose() { - masterService.close(); + clusterManagerService.close(); clusterApplierService.close(); } @@ -228,8 +228,14 @@ public void addLocalNodeMasterListener(LocalNodeMasterListener listener) { addLocalNodeClusterManagerListener(listener); } + public ClusterManagerService getClusterManagerService() { + return clusterManagerService; + } + + /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #getClusterManagerService()} */ + @Deprecated public MasterService getMasterService() { - return masterService; + return clusterManagerService; } /** @@ -349,6 +355,6 @@ public void submitStateUpdateTasks( final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor ) { - masterService.submitStateUpdateTasks(source, tasks, config, executor); + clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6cc8128cffd52..e2470d97f12f3 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -827,7 +827,7 @@ protected Node( transportService, namedWriteableRegistry, networkService, - clusterService.getMasterService(), + clusterService.getClusterManagerService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), @@ -1082,7 +1082,7 @@ public Node start() throws NodeValidationException { injector.getInstance(GatewayService.class).start(); Discovery discovery = injector.getInstance(Discovery.class); - clusterService.getMasterService().setClusterStatePublisher(discovery::publish); + clusterService.getClusterManagerService().setClusterStatePublisher(discovery::publish); // Start the transport service now so the publish address will be added to the local disco node in ClusterService TransportService transportService = injector.getInstance(TransportService.class); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java index 0c728c8314517..69570fa7b4640 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java @@ -692,7 +692,7 @@ private static class AdjustConfigurationForExclusions implements Listener { @Override public void onNewClusterState(ClusterState state) { - clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { + clusterService.getClusterManagerService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { assertThat(currentState, sameInstance(state)); diff --git a/server/src/test/java/org/opensearch/cluster/routing/BatchedRerouteServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/BatchedRerouteServiceTests.java index 241ee3fca3553..796d73aa715de 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/BatchedRerouteServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/BatchedRerouteServiceTests.java @@ -223,7 +223,7 @@ public void testNotifiesOnFailure() throws InterruptedException { } })); if (rarely()) { - clusterService.getMasterService() + clusterService.getClusterManagerService() .setClusterStatePublisher( randomBoolean() ? ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService()) diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index 55ae70df9892c..dfa7f439fb358 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -164,8 +164,9 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); clusterService.getClusterApplierService().setInitialState(initialClusterState); - clusterService.getMasterService().setClusterStatePublisher(createClusterStatePublisher(clusterService.getClusterApplierService())); - clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state); + clusterService.getClusterManagerService() + .setClusterStatePublisher(createClusterStatePublisher(clusterService.getClusterApplierService())); + clusterService.getClusterManagerService().setClusterStateSupplier(clusterService.getClusterApplierService()::state); clusterService.start(); return clusterService; } diff --git a/test/framework/src/main/java/org/opensearch/test/disruption/BlockMasterServiceOnMaster.java b/test/framework/src/main/java/org/opensearch/test/disruption/BlockMasterServiceOnMaster.java index 19aaba154109d..0a626f1473826 100644 --- a/test/framework/src/main/java/org/opensearch/test/disruption/BlockMasterServiceOnMaster.java +++ b/test/framework/src/main/java/org/opensearch/test/disruption/BlockMasterServiceOnMaster.java @@ -66,7 +66,7 @@ public void startDisrupting() { boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1)); assert success : "startDisrupting called without waiting on stopDisrupting to complete"; final CountDownLatch started = new CountDownLatch(1); - clusterService.getMasterService().submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask() { + clusterService.getClusterManagerService().submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask() { @Override public Priority priority() { return Priority.IMMEDIATE; diff --git a/test/framework/src/main/java/org/opensearch/test/disruption/BusyMasterServiceDisruption.java b/test/framework/src/main/java/org/opensearch/test/disruption/BusyMasterServiceDisruption.java index d764c23404a91..fb40298bb98ef 100644 --- a/test/framework/src/main/java/org/opensearch/test/disruption/BusyMasterServiceDisruption.java +++ b/test/framework/src/main/java/org/opensearch/test/disruption/BusyMasterServiceDisruption.java @@ -67,7 +67,7 @@ public void startDisrupting() { } private void submitTask(ClusterService clusterService) { - clusterService.getMasterService().submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(priority) { + clusterService.getClusterManagerService().submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { if (active.get()) { From a7e4155f43c626aa8e3db88d0130dca237e48ef1 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 14:31:46 -0700 Subject: [PATCH 2/7] Rename variable names from masterService to clusterManagerService Signed-off-by: Tianli Feng --- .../cluster/coordination/Coordinator.java | 16 ++-- .../cluster/coordination/JoinHelper.java | 10 +-- .../opensearch/discovery/DiscoveryModule.java | 4 +- ...rnalClusterInfoServiceSchedulingTests.java | 8 +- .../cluster/coordination/NodeJoinTests.java | 30 +++---- .../cluster/service/MasterServiceTests.java | 78 +++++++++---------- .../discovery/DiscoveryModuleTests.java | 6 +- .../snapshots/SnapshotResiliencyTests.java | 15 ++-- .../AbstractCoordinatorTestCase.java | 16 ++-- .../opensearch/test/ClusterServiceUtils.java | 10 +-- .../FakeThreadPoolMasterServiceTests.java | 12 +-- 11 files changed, 105 insertions(+), 100 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 14f9cca3630fe..b8b00b2cc5e65 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final boolean singleNodeDiscovery; private final ElectionStrategy electionStrategy; private final TransportService transportService; - private final MasterService masterService; + private final MasterService clusterManagerService; private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -191,7 +191,7 @@ public Coordinator( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, - MasterService masterService, + MasterService clusterManagerService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, @@ -203,7 +203,7 @@ public Coordinator( ) { this.settings = settings; this.transportService = transportService; - this.masterService = masterService; + this.clusterManagerService = clusterManagerService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings); @@ -211,7 +211,7 @@ public Coordinator( this.joinHelper = new JoinHelper( settings, allocationService, - masterService, + clusterManagerService, transportService, this::getCurrentTerm, this::getStateForClusterManagerService, @@ -260,7 +260,7 @@ public Coordinator( ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; - masterService.setClusterStateSupplier(this::getStateForClusterManagerService); + clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService( settings, @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) { private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( "node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), @@ -757,7 +757,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { } private void cleanClusterManagerService() { - masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() { + clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() { @Override public void onFailure(String source, Exception e) { // ignore @@ -1129,7 +1129,7 @@ private void scheduleReconfigurationIfNeeded() { final ClusterState state = getLastAcceptedState(); if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { logger.trace("scheduling reconfiguration"); - masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { + clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { reconfigurationTaskScheduled.set(false); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 2eae6411eff69..f5c01074b87dd 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -106,7 +106,7 @@ public class JoinHelper { Setting.Property.Deprecated ); - private final MasterService masterService; + private final MasterService clusterManagerService; private final TransportService transportService; private volatile JoinTaskExecutor joinTaskExecutor; @@ -122,7 +122,7 @@ public class JoinHelper { JoinHelper( Settings settings, AllocationService allocationService, - MasterService masterService, + MasterService clusterManagerService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, @@ -132,7 +132,7 @@ public class JoinHelper { RerouteService rerouteService, NodeHealthService nodeHealthService ) { - this.masterService = masterService; + this.clusterManagerService = clusterManagerService; this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); @@ -458,7 +458,7 @@ class LeaderJoinAccumulator implements JoinAccumulator { public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader"); assert joinTaskExecutor != null; - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( "node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), @@ -543,7 +543,7 @@ public void close(Mode newMode) { pendingAsTasks.put(JoinTaskExecutor.newBecomeClusterManagerTask(), (source, e) -> {}); pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {}); joinTaskExecutor = joinTaskExecutorGenerator.get(); - masterService.submitStateUpdateTasks( + clusterManagerService.submitStateUpdateTasks( stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 6b746e5963bdc..c12283df1dbc9 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -121,7 +121,7 @@ public DiscoveryModule( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, - MasterService masterService, + MasterService clusterManagerService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, @@ -197,7 +197,7 @@ public DiscoveryModule( transportService, namedWriteableRegistry, allocationService, - masterService, + clusterManagerService, gatewayMetaState::getPersistedState, seedHostsProvider, clusterApplier, diff --git a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java index a989bacbf47bb..d0b0631fee300 100644 --- a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -89,14 +89,14 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { } }; - final MasterService masterService = new FakeThreadPoolMasterService( + final MasterService clusterManagerService = new FakeThreadPoolMasterService( "test", "clusterManagerService", threadPool, r -> { fail("cluster-manager service should not run any tasks"); } ); - final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); + final ClusterService clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client); @@ -105,8 +105,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService()); clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("cluster")).nodes(noClusterManager).build()); - masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish")); - masterService.setClusterStateSupplier(clusterApplierService::state); + clusterManagerService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish")); + clusterManagerService.setClusterStateSupplier(clusterApplierService::state); clusterService.start(); final AtomicBoolean becameClusterManager1 = new AtomicBoolean(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 8e81a3043426f..2eb29d58da5b0 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -98,7 +98,7 @@ public class NodeJoinTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private MasterService masterService; + private MasterService clusterManagerService; private Coordinator coordinator; private DeterministicTaskQueue deterministicTaskQueue; private Transport transport; @@ -117,7 +117,7 @@ public static void afterClass() { @After public void tearDown() throws Exception { super.tearDown(); - masterService.close(); + clusterManagerService.close(); } private static ClusterState initialState(DiscoveryNode localNode, long term, long version, VotingConfiguration config) { @@ -166,40 +166,40 @@ private void setupFakeClusterManagerServiceAndCoordinator(long term, ClusterStat } private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterState initialState) { - MasterService masterService = new MasterService( + MasterService clusterManagerService = new MasterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); AtomicReference clusterStateRef = new AtomicReference<>(initialState); - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); setupClusterManagerServiceAndCoordinator( term, initialState, - masterService, + clusterManagerService, threadPool, new Random(Randomness.get().nextLong()), () -> new StatusInfo(HEALTHY, "healthy-info") ); - masterService.setClusterStateSupplier(clusterStateRef::get); - masterService.start(); + clusterManagerService.setClusterStateSupplier(clusterStateRef::get); + clusterManagerService.start(); } private void setupClusterManagerServiceAndCoordinator( long term, ClusterState initialState, - MasterService masterService, + MasterService clusterManagerService, ThreadPool threadPool, Random random, NodeHealthService nodeHealthService ) { - if (this.masterService != null || coordinator != null) { + if (this.clusterManagerService != null || coordinator != null) { throw new IllegalStateException("method setupClusterManagerServiceAndCoordinator can only be called once"); } - this.masterService = masterService; + this.clusterManagerService = clusterManagerService; CapturingTransport capturingTransport = new CapturingTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { @@ -231,7 +231,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService, writableRegistry(), OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY), - masterService, + clusterManagerService, () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), @@ -514,7 +514,7 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception { ); assertTrue( - MasterServiceTests.discoveryState(masterService) + MasterServiceTests.discoveryState(clusterManagerService) .getVotingConfigExclusions() .stream() .anyMatch( @@ -746,7 +746,7 @@ public void testConcurrentJoining() { throw new RuntimeException(e); } - assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster()); + assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); for (DiscoveryNode successfulNode : successfulNodes) { assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode)); assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode)); @@ -776,10 +776,10 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() { } private boolean isLocalNodeElectedMaster() { - return MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster(); + return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); } private boolean clusterStateHasNode(DiscoveryNode node) { - return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId())); + return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); } } diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 8827064974a19..47830c290f7d8 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -123,7 +123,7 @@ public void randomizeCurrentTime() { private MasterService createClusterManagerService(boolean makeClusterManager) { final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - final MasterService masterService = new MasterService( + final MasterService clusterManagerService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -141,13 +141,13 @@ private MasterService createClusterManagerService(boolean makeClusterManager) { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - masterService.setClusterStateSupplier(clusterStateRef::get); - masterService.start(); - return masterService; + clusterManagerService.setClusterStateSupplier(clusterStateRef::get); + clusterManagerService.start(); + return clusterManagerService; } public void testClusterManagerAwareExecution() throws Exception { @@ -194,7 +194,7 @@ public void onFailure(String source, Exception e) { } public void testThreadContext() throws InterruptedException { - final MasterService masterService = createClusterManagerService(true); + final MasterService clusterManagerService = createClusterManagerService(true); final CountDownLatch latch = new CountDownLatch(1); try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { @@ -208,7 +208,7 @@ public void testThreadContext() throws InterruptedException { final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); final TimeValue clusterManagerTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); - masterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + clusterManagerService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { assertTrue(threadPool.getThreadContext().isSystemContext()); @@ -280,7 +280,7 @@ public void onAckTimeout() { latch.await(); - masterService.close(); + clusterManagerService.close(); } /* @@ -292,8 +292,8 @@ public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws Interru final CountDownLatch latch = new CountDownLatch(1); AtomicBoolean published = new AtomicBoolean(); - try (MasterService masterService = createClusterManagerService(true)) { - masterService.submitStateUpdateTask( + try (MasterService clusterManagerService = createClusterManagerService(true)) { + clusterManagerService.submitStateUpdateTask( "testClusterStateTaskListenerThrowingExceptionIsOkay", new Object(), ClusterStateTaskConfig.build(Priority.NORMAL), @@ -421,8 +421,8 @@ public void testClusterStateUpdateLogging() throws Exception { ) ); - try (MasterService masterService = createClusterManagerService(true)) { - masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + try (MasterService clusterManagerService = createClusterManagerService(true)) { + clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis(); @@ -437,7 +437,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis(); @@ -452,7 +452,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void onFailure(String source, Exception e) {} }); - masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis(); @@ -469,7 +469,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return currentState; @@ -617,7 +617,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }; - try (MasterService masterService = createClusterManagerService(true)) { + try (MasterService clusterManagerService = createClusterManagerService(true)) { final ConcurrentMap submittedTasksPerThread = new ConcurrentHashMap<>(); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { @@ -632,7 +632,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size()); final TaskExecutor executor = assignment.v1(); if (tasks.size() == 1) { - masterService.submitStateUpdateTask( + clusterManagerService.submitStateUpdateTask( threadName, tasks.stream().findFirst().get(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -642,7 +642,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } else { Map taskListeners = new HashMap<>(); tasks.forEach(t -> taskListeners.put(t, listener)); - masterService.submitStateUpdateTasks( + clusterManagerService.submitStateUpdateTasks( threadName, taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -696,8 +696,8 @@ public void testBlockingCallInClusterStateTaskListenerFails() throws Interrupted final CountDownLatch latch = new CountDownLatch(1); final AtomicReference assertionRef = new AtomicReference<>(); - try (MasterService masterService = createClusterManagerService(true)) { - masterService.submitStateUpdateTask( + try (MasterService clusterManagerService = createClusterManagerService(true)) { + clusterManagerService.submitStateUpdateTask( "testBlockingCallInClusterStateTaskListenerFails", new Object(), ClusterStateTaskConfig.build(Priority.NORMAL), @@ -788,7 +788,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { ); try ( - MasterService masterService = new MasterService( + MasterService clusterManagerService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -810,7 +810,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { if (event.source().contains("test5")) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY @@ -825,12 +825,12 @@ public void testLongClusterStateUpdateLogging() throws Exception { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - masterService.setClusterStateSupplier(clusterStateRef::get); - masterService.start(); + clusterManagerService.setClusterStateSupplier(clusterStateRef::get); + clusterManagerService.start(); final CountDownLatch latch = new CountDownLatch(6); final CountDownLatch processedFirstTask = new CountDownLatch(1); - masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += randomLongBetween( @@ -853,7 +853,7 @@ public void onFailure(String source, Exception e) { }); processedFirstTask.await(); - masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -872,7 +872,7 @@ public void onFailure(String source, Exception e) { latch.countDown(); } }); - masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -891,7 +891,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -910,7 +910,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - masterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).incrementVersion().build(); @@ -926,7 +926,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - masterService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).incrementVersion().build(); @@ -944,7 +944,7 @@ public void onFailure(String source, Exception e) { }); // Additional update task to make sure all previous logging made it to the loggerName // We don't check logging for this on since there is no guarantee that it will occur before our check - masterService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return currentState; @@ -971,7 +971,7 @@ public void testAcking() throws InterruptedException { final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); try ( - MasterService masterService = new MasterService( + MasterService clusterManagerService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -986,9 +986,9 @@ public void testAcking() throws InterruptedException { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference publisherRef = new AtomicReference<>(); - masterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al)); - masterService.setClusterStateSupplier(() -> initialClusterState); - masterService.start(); + clusterManagerService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al)); + clusterManagerService.setClusterStateSupplier(() -> initialClusterState); + clusterManagerService.start(); // check that we don't time out before even committing the cluster state { @@ -1000,7 +1000,7 @@ public void testAcking() throws InterruptedException { ) ); - masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + clusterManagerService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -1055,7 +1055,7 @@ public void onAckTimeout() { ackListener.onNodeAck(node3, null); }); - masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + clusterManagerService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -1101,8 +1101,8 @@ public void onAckTimeout() { /** * Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer) */ - public static ClusterState discoveryState(MasterService masterService) { - return masterService.state(); + public static ClusterState discoveryState(MasterService clusterManagerService) { + return clusterManagerService.state(); } } diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index d1e3f406b4933..efcefab6c9f8b 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -70,7 +70,7 @@ public class DiscoveryModuleTests extends OpenSearchTestCase { private TransportService transportService; private NamedWriteableRegistry namedWriteableRegistry; - private MasterService masterService; + private MasterService clusterManagerService; private ClusterApplier clusterApplier; private ThreadPool threadPool; private ClusterSettings clusterSettings; @@ -93,7 +93,7 @@ public void setupDummyServices() { threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null); - masterService = mock(MasterService.class); + clusterManagerService = mock(MasterService.class); namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); clusterApplier = mock(ClusterApplier.class); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -112,7 +112,7 @@ private DiscoveryModule newModule(Settings settings, List plugi transportService, namedWriteableRegistry, null, - masterService, + clusterManagerService, clusterApplier, clusterSettings, plugins, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index cca27366f7df0..89fc88ecacbbd 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1645,7 +1645,7 @@ private final class TestClusterNode { private final DiscoveryNode node; - private final MasterService masterService; + private final MasterService clusterManagerService; private final AllocationService allocationService; @@ -1665,13 +1665,18 @@ private final class TestClusterNode { this.node = node; final Environment environment = createEnvironment(node.getName()); threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)); - masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow); + clusterManagerService = new FakeThreadPoolMasterService( + node.getName(), + "test", + threadPool, + deterministicTaskQueue::scheduleNow + ); final Settings settings = environment.settings(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = new ClusterService( settings, clusterSettings, - masterService, + clusterManagerService, new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { @Override protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @@ -2208,7 +2213,7 @@ public void start(ClusterState initialState) { transportService, namedWriteableRegistry, allocationService, - masterService, + clusterManagerService, () -> persistedState, hostsResolver -> nodes.values() .stream() @@ -2222,7 +2227,7 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info") ); - masterService.setClusterStatePublisher(coordinator); + clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService); nodeConnectionsService.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 36469b2ee1999..f928ffab4fd0a 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -557,7 +557,7 @@ void stabilise(long stabilisationDurationMillis) { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); - final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount(); + final int pendingTaskCount = leader.clusterManagerService.getFakeMasterServicePendingTaskCount(); runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue"); final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); @@ -1026,7 +1026,7 @@ class ClusterNode { private final DiscoveryNode localNode; final MockPersistedState persistedState; final Settings nodeSettings; - private AckedFakeThreadPoolMasterService masterService; + private AckedFakeThreadPoolMasterService clusterManagerService; private DisruptableClusterApplierService clusterApplierService; private ClusterService clusterService; TransportService transportService; @@ -1106,7 +1106,7 @@ protected Optional getDisruptableMockTransport(Transpo null, emptySet() ); - masterService = new AckedFakeThreadPoolMasterService( + clusterManagerService = new AckedFakeThreadPoolMasterService( localNode.getId(), "test", threadPool, @@ -1120,7 +1120,7 @@ protected Optional getDisruptableMockTransport(Transpo deterministicTaskQueue, threadPool ); - clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); + clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); clusterService.setNodeConnectionsService( new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) ); @@ -1135,7 +1135,7 @@ protected Optional getDisruptableMockTransport(Transpo transportService, writableRegistry(), allocationService, - masterService, + clusterManagerService, this::getPersistedState, Cluster.this::provideSeedHosts, clusterApplierService, @@ -1145,7 +1145,7 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService ); - masterService.setClusterStatePublisher(coordinator); + clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( settings, allocationService, @@ -1334,11 +1334,11 @@ AckCollector submitUpdateTask( onNode(() -> { logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source); final long submittedTerm = coordinator.getCurrentTerm(); - masterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { assertThat(currentState.term(), greaterThanOrEqualTo(submittedTerm)); - masterService.nextAckCollector = ackCollector; + clusterManagerService.nextAckCollector = ackCollector; return clusterStateUpdate.apply(currentState); } diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index dfa7f439fb358..b6e4037fb6e5c 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -62,19 +62,19 @@ public class ClusterServiceUtils { public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { - MasterService masterService = new MasterService( + MasterService clusterManagerService = new MasterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_cluster_manager_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - masterService.setClusterStateSupplier(clusterStateRef::get); - masterService.start(); - return masterService; + clusterManagerService.setClusterStateSupplier(clusterStateRef::get); + clusterManagerService.start(); + return clusterManagerService; } public static MasterService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { diff --git a/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java index b4cdb0b33391a..dabeeb6742ec4 100644 --- a/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -84,21 +84,21 @@ public void testFakeClusterManagerService() { doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any()); when(mockThreadPool.generic()).thenReturn(executorService); - FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService( + FakeThreadPoolMasterService clusterManagerService = new FakeThreadPoolMasterService( "test_node", "test", mockThreadPool, runnableTasks::add ); - masterService.setClusterStateSupplier(lastClusterStateRef::get); - masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { + clusterManagerService.setClusterStateSupplier(lastClusterStateRef::get); + clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { lastClusterStateRef.set(event.state()); publishingCallback.set(publishListener); }); - masterService.start(); + clusterManagerService.start(); AtomicBoolean firstTaskCompleted = new AtomicBoolean(); - masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState) @@ -138,7 +138,7 @@ public void onFailure(String source, Exception e) { assertThat(runnableTasks.size(), equalTo(0)); AtomicBoolean secondTaskCompleted = new AtomicBoolean(); - masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState) From a892057a95bb0f2730de70549ea827bc60db0616 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 14:34:22 -0700 Subject: [PATCH 3/7] Change FakeThreadPoolMasterService to extend ClusterManagerService instead of MasterService Signed-off-by: Tianli Feng --- .../opensearch/cluster/service/FakeThreadPoolMasterService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolMasterService.java index c532a0cc36472..7826a3aef9801 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/service/FakeThreadPoolMasterService.java @@ -55,7 +55,7 @@ import static org.apache.lucene.tests.util.LuceneTestCase.random; import static org.opensearch.test.OpenSearchTestCase.randomInt; -public class FakeThreadPoolMasterService extends MasterService { +public class FakeThreadPoolMasterService extends ClusterManagerService { private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class); private final String name; From fe7af8f656aecf337ef7d903e5171200298db588 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 15:01:59 -0700 Subject: [PATCH 4/7] Rename variable type from MasterService to ClusterManagerService Signed-off-by: Tianli Feng --- .../cluster/ClusterStateTaskListener.java | 6 +- .../cluster/coordination/Coordinator.java | 6 +- .../cluster/coordination/JoinHelper.java | 6 +- .../cluster/service/ClusterService.java | 2 +- .../common/settings/ClusterSettings.java | 6 +- .../common/util/concurrent/BaseFuture.java | 4 +- .../opensearch/discovery/DiscoveryModule.java | 4 +- ...rnalClusterInfoServiceSchedulingTests.java | 4 +- .../cluster/coordination/NodeJoinTests.java | 18 ++-- ...s.java => ClusterManagerServiceTests.java} | 90 ++++++++++--------- .../discovery/DiscoveryModuleTests.java | 6 +- .../snapshots/SnapshotResiliencyTests.java | 4 +- 12 files changed, 79 insertions(+), 77 deletions(-) rename server/src/test/java/org/opensearch/cluster/service/{MasterServiceTests.java => ClusterManagerServiceTests.java} (92%) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskListener.java b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskListener.java index 37108d356d86a..d6c4abfad7b8d 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskListener.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskListener.java @@ -31,7 +31,7 @@ package org.opensearch.cluster; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import java.util.List; @@ -49,7 +49,7 @@ public interface ClusterStateTaskListener { /** * called when the task was rejected because the local node is no longer cluster-manager. - * Used only for tasks submitted to {@link MasterService}. + * Used only for tasks submitted to {@link ClusterManagerService}. */ default void onNoLongerClusterManager(String source) { onFailure(source, new NotClusterManagerException("no longer cluster-manager. source: [" + source + "]")); @@ -57,7 +57,7 @@ default void onNoLongerClusterManager(String source) { /** * called when the task was rejected because the local node is no longer cluster-manager. - * Used only for tasks submitted to {@link MasterService}. + * Used only for tasks submitted to {@link ClusterManagerService}. * * @deprecated As of 2.1, because supporting inclusive language, replaced by {@link #onNoLongerClusterManager(String)} */ diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index b8b00b2cc5e65..ace10cca76310 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -58,7 +58,7 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final boolean singleNodeDiscovery; private final ElectionStrategy electionStrategy; private final TransportService transportService; - private final MasterService clusterManagerService; + private final ClusterManagerService clusterManagerService; private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -191,7 +191,7 @@ public Coordinator( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, - MasterService clusterManagerService, + ClusterManagerService clusterManagerService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index f5c01074b87dd..656e6d220720f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -46,7 +46,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.Priority; import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.stream.StreamInput; @@ -106,7 +106,7 @@ public class JoinHelper { Setting.Property.Deprecated ); - private final MasterService clusterManagerService; + private final ClusterManagerService clusterManagerService; private final TransportService transportService; private volatile JoinTaskExecutor joinTaskExecutor; @@ -122,7 +122,7 @@ public class JoinHelper { JoinHelper( Settings settings, AllocationService allocationService, - MasterService clusterManagerService, + ClusterManagerService clusterManagerService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 19a4ccdc58542..1db23a0f8ac7a 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -258,7 +258,7 @@ public ClusterApplierService getClusterApplierService() { public static boolean assertClusterOrClusterManagerStateThread() { assert Thread.currentThread().getName().contains(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME) - || Thread.currentThread().getName().contains(MasterService.MASTER_UPDATE_THREAD_NAME) + || Thread.currentThread().getName().contains(ClusterManagerService.MASTER_UPDATE_THREAD_NAME) : "not called from the master/cluster state update thread"; return true; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 11cb2ca316235..72085ebc3dd78 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -82,7 +82,7 @@ import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -335,8 +335,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndexModule.NODE_STORE_ALLOW_MMAP, ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, ClusterService.USER_DEFINED_METADATA, - MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java index fef37299b349d..e4f8e1a221a0d 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BaseFuture.java @@ -33,7 +33,7 @@ package org.opensearch.common.util.concurrent; import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.Nullable; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; @@ -109,7 +109,7 @@ protected boolean blockingAllowed() { return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) - && MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); + && ClusterManagerService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); } @Override diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index c12283df1dbc9..44f44fa055b2b 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -41,7 +41,7 @@ import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterApplier; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.Randomness; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; @@ -121,7 +121,7 @@ public DiscoveryModule( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, - MasterService clusterManagerService, + ClusterManagerService clusterManagerService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, diff --git a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java index d0b0631fee300..0e39bcad7748c 100644 --- a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -49,7 +49,7 @@ import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.FakeThreadPoolMasterService; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor; @@ -89,7 +89,7 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { } }; - final MasterService clusterManagerService = new FakeThreadPoolMasterService( + final ClusterManagerService clusterManagerService = new FakeThreadPoolMasterService( "test", "clusterManagerService", threadPool, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 2eb29d58da5b0..8d3ba85a90d33 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -44,8 +44,8 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.FakeThreadPoolMasterService; -import org.opensearch.cluster.service.MasterService; -import org.opensearch.cluster.service.MasterServiceTests; +import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.service.ClusterManagerServiceTests; import org.opensearch.common.Randomness; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -98,7 +98,7 @@ public class NodeJoinTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private MasterService clusterManagerService; + private ClusterManagerService clusterManagerService; private Coordinator coordinator; private DeterministicTaskQueue deterministicTaskQueue; private Transport transport; @@ -166,7 +166,7 @@ private void setupFakeClusterManagerServiceAndCoordinator(long term, ClusterStat } private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterState initialState) { - MasterService clusterManagerService = new MasterService( + ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool @@ -191,7 +191,7 @@ private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterStat private void setupClusterManagerServiceAndCoordinator( long term, ClusterState initialState, - MasterService clusterManagerService, + ClusterManagerService clusterManagerService, ThreadPool threadPool, Random random, NodeHealthService nodeHealthService @@ -514,7 +514,7 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception { ); assertTrue( - MasterServiceTests.discoveryState(clusterManagerService) + ClusterManagerServiceTests.discoveryState(clusterManagerService) .getVotingConfigExclusions() .stream() .anyMatch( @@ -746,7 +746,7 @@ public void testConcurrentJoining() { throw new RuntimeException(e); } - assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); + assertTrue(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); for (DiscoveryNode successfulNode : successfulNodes) { assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode)); assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode)); @@ -776,10 +776,10 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() { } private boolean isLocalNodeElectedMaster() { - return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); + return ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); } private boolean clusterStateHasNode(DiscoveryNode node) { - return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); + return node.equals(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); } } diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java similarity index 92% rename from server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java rename to server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java index 47830c290f7d8..4bf573f6782d8 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java @@ -93,14 +93,14 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; -public class MasterServiceTests extends OpenSearchTestCase { +public class ClusterManagerServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; private static long relativeTimeInMillis; @BeforeClass public static void createThreadPool() { - threadPool = new TestThreadPool(MasterServiceTests.class.getName()) { + threadPool = new TestThreadPool(ClusterManagerServiceTests.class.getName()) { @Override public long relativeTimeInMillis() { return relativeTimeInMillis; @@ -121,17 +121,17 @@ public void randomizeCurrentTime() { relativeTimeInMillis = randomLongBetween(0L, 1L << 62); } - private MasterService createClusterManagerService(boolean makeClusterManager) { + private ClusterManagerService createClusterManagerService(boolean makeClusterManager) { final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - final MasterService clusterManagerService = new MasterService( + final ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) .nodes( DiscoveryNodes.builder() .add(localNode) @@ -151,7 +151,7 @@ private MasterService createClusterManagerService(boolean makeClusterManager) { } public void testClusterManagerAwareExecution() throws Exception { - final MasterService nonClusterManager = createClusterManagerService(false); + final ClusterManagerService nonClusterManager = createClusterManagerService(false); final boolean[] taskFailed = { false }; final CountDownLatch latch1 = new CountDownLatch(1); @@ -194,7 +194,7 @@ public void onFailure(String source, Exception e) { } public void testThreadContext() throws InterruptedException { - final MasterService clusterManagerService = createClusterManagerService(true); + final ClusterManagerService clusterManagerService = createClusterManagerService(true); final CountDownLatch latch = new CountDownLatch(1); try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { @@ -292,7 +292,7 @@ public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws Interru final CountDownLatch latch = new CountDownLatch(1); AtomicBoolean published = new AtomicBoolean(); - try (MasterService clusterManagerService = createClusterManagerService(true)) { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { clusterManagerService.submitStateUpdateTask( "testClusterStateTaskListenerThrowingExceptionIsOkay", new Object(), @@ -328,11 +328,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") public void testClusterStateUpdateLogging() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test1]" ) @@ -340,7 +340,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [1s] to compute cluster state update for [test1]" ) @@ -348,7 +348,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test1]" ) @@ -357,7 +357,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test2]" ) @@ -365,7 +365,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 failure", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.TRACE, "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" ) @@ -373,7 +373,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [2s] to compute cluster state update for [test2]" ) @@ -381,7 +381,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test2]" ) @@ -390,7 +390,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 start", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test3]" ) @@ -398,7 +398,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 computation", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [3s] to compute cluster state update for [test3]" ) @@ -406,7 +406,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 notification", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" ) @@ -415,13 +415,13 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test4]" ) ); - try (MasterService clusterManagerService = createClusterManagerService(true)) { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -617,7 +617,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }; - try (MasterService clusterManagerService = createClusterManagerService(true)) { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { final ConcurrentMap submittedTasksPerThread = new ConcurrentHashMap<>(); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { @@ -696,7 +696,7 @@ public void testBlockingCallInClusterStateTaskListenerFails() throws Interrupted final CountDownLatch latch = new CountDownLatch(1); final AtomicReference assertionRef = new AtomicReference<>(); - try (MasterService clusterManagerService = createClusterManagerService(true)) { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { clusterManagerService.submitStateUpdateTask( "testBlockingCallInClusterStateTaskListenerFails", new Object(), @@ -737,11 +737,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") public void testLongClusterStateUpdateLogging() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test1 shouldn't log because it was fast enough", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took*test1*" ) @@ -749,7 +749,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test2]" ) @@ -757,7 +757,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test3]" ) @@ -765,7 +765,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test4]" ) @@ -773,7 +773,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test5 should not log despite publishing slowly", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "*took*test5*" ) @@ -781,16 +781,16 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test6 should log due to slow and failing publication", - MasterService.class.getCanonicalName(), + ClusterManagerService.class.getCanonicalName(), Level.WARN, "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*" ) ); try ( - MasterService clusterManagerService = new MasterService( + ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -805,19 +805,21 @@ public void testLongClusterStateUpdateLogging() throws Exception { emptySet(), Version.CURRENT ); - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder( + new ClusterName(ClusterManagerServiceTests.class.getSimpleName()) + ) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { if (event.source().contains("test5")) { - relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); } if (event.source().contains("test6")) { - relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); throw new OpenSearchException("simulated error during slow publication which should trigger logging"); @@ -835,7 +837,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += randomLongBetween( 0L, - MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() + ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() ); return currentState; } @@ -856,7 +858,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); @@ -875,7 +877,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); return ClusterState.builder(currentState).incrementVersion().build(); @@ -894,7 +896,7 @@ public void onFailure(String source, Exception e) { clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( + relativeTimeInMillis += ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY ).millis() + randomLongBetween(1, 1000000); return currentState; @@ -971,9 +973,9 @@ public void testAcking() throws InterruptedException { final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); try ( - MasterService clusterManagerService = new MasterService( + ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -981,7 +983,7 @@ public void testAcking() throws InterruptedException { ) ) { - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).localNodeId(node1.getId()).masterNodeId(node1.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); @@ -1101,7 +1103,7 @@ public void onAckTimeout() { /** * Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer) */ - public static ClusterState discoveryState(MasterService clusterManagerService) { + public static ClusterState discoveryState(ClusterManagerService clusterManagerService) { return clusterManagerService.state(); } diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index efcefab6c9f8b..5588e9c1ceba8 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -37,7 +37,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.service.ClusterApplier; -import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; @@ -70,7 +70,7 @@ public class DiscoveryModuleTests extends OpenSearchTestCase { private TransportService transportService; private NamedWriteableRegistry namedWriteableRegistry; - private MasterService clusterManagerService; + private ClusterManagerService clusterManagerService; private ClusterApplier clusterApplier; private ThreadPool threadPool; private ClusterSettings clusterSettings; @@ -93,7 +93,7 @@ public void setupDummyServices() { threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null); - clusterManagerService = mock(MasterService.class); + clusterManagerService = mock(ClusterManagerService.class); namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); clusterApplier = mock(ClusterApplier.class); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 89fc88ecacbbd..0a79235673dfe 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -143,9 +143,9 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.FakeThreadPoolMasterService; -import org.opensearch.cluster.service.MasterService; import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -1645,7 +1645,7 @@ private final class TestClusterNode { private final DiscoveryNode node; - private final MasterService clusterManagerService; + private final ClusterManagerService clusterManagerService; private final AllocationService allocationService; From 7276450eaf32540cabf1a9ec890c5ad9b879174d Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 20:01:32 -0700 Subject: [PATCH 5/7] Change logger class back to MasterService.class Signed-off-by: Tianli Feng --- .../service/ClusterManagerServiceTests.java | 38 +++++++++---------- .../opensearch/test/ClusterServiceUtils.java | 10 ++--- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java index 4bf573f6782d8..8a3c14bc3b1d3 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java @@ -328,11 +328,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") public void testClusterStateUpdateLogging() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 start", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test1]" ) @@ -340,7 +340,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 computation", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [1s] to compute cluster state update for [test1]" ) @@ -348,7 +348,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test1 notification", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test1]" ) @@ -357,7 +357,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 start", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test2]" ) @@ -365,7 +365,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 failure", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.TRACE, "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" ) @@ -373,7 +373,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 computation", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [2s] to compute cluster state update for [test2]" ) @@ -381,7 +381,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2 notification", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test2]" ) @@ -390,7 +390,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 start", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test3]" ) @@ -398,7 +398,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 computation", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [3s] to compute cluster state update for [test3]" ) @@ -406,7 +406,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3 notification", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" ) @@ -415,7 +415,7 @@ public void testClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.DEBUG, "executing cluster state update for [test4]" ) @@ -737,11 +737,11 @@ public void onFailure(String source, Exception e) {} @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") public void testLongClusterStateUpdateLogging() throws Exception { - try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterManagerService.class))) { + try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) { mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test1 shouldn't log because it was fast enough", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "*took*test1*" ) @@ -749,7 +749,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test2", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test2]" ) @@ -757,7 +757,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test3", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test3]" ) @@ -765,7 +765,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test4", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test4]" ) @@ -773,7 +773,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation( "test5 should not log despite publishing slowly", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "*took*test5*" ) @@ -781,7 +781,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation( "test6 should log due to slow and failing publication", - ClusterManagerService.class.getCanonicalName(), + MasterService.class.getCanonicalName(), Level.WARN, "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*" ) diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index b6e4037fb6e5c..523ce0ef5e799 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -46,8 +46,8 @@ import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.cluster.service.MasterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.Node; @@ -61,8 +61,8 @@ public class ClusterServiceUtils { - public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { - MasterService clusterManagerService = new MasterService( + public static ClusterManagerService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { + ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_cluster_manager_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool @@ -77,7 +77,7 @@ public static MasterService createMasterService(ThreadPool threadPool, ClusterSt return clusterManagerService; } - public static MasterService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { + public static ClusterManagerService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) @@ -114,7 +114,7 @@ public void onFailure(String source, Exception e) { } } - public static void setState(MasterService executor, ClusterState clusterState) { + public static void setState(ClusterManagerService executor, ClusterState clusterState) { CountDownLatch latch = new CountDownLatch(1); executor.submitStateUpdateTask("test setting state", new ClusterStateUpdateTask() { @Override From 0e546608eef560772d17e87989363288f04a5e11 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 22:11:34 -0700 Subject: [PATCH 6/7] Add unit test for validating backwards compatibility of ClusterService.getMasterService() Signed-off-by: Tianli Feng --- .../service/ClusterManagerService.java | 5 +++ .../cluster/service/MasterService.java | 2 + .../cluster/coordination/NodeJoinTests.java | 10 ++--- .../cluster/service/ClusterServiceTests.java | 40 +++++++++++++++++++ ...viceTests.java => MasterServiceTests.java} | 18 ++++----- 5 files changed, 60 insertions(+), 15 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java rename server/src/test/java/org/opensearch/cluster/service/{ClusterManagerServiceTests.java => MasterServiceTests.java} (98%) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java index 4eef515a904af..74b623dd95e6f 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerService.java @@ -12,6 +12,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.threadpool.ThreadPool; +/** + * Main Cluster Manager Node Service + * + * @opensearch.internal + */ public class ClusterManagerService extends MasterService { public ClusterManagerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { super(settings, clusterSettings, threadPool); diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index cc58c56e00dd5..f2cedd9756a4d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -87,7 +87,9 @@ * Main Master Node Service * * @opensearch.internal + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link ClusterManagerService}. */ +@Deprecated public class MasterService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(MasterService.class); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 8d3ba85a90d33..65a0867ccd26b 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -45,7 +45,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.FakeThreadPoolMasterService; import org.opensearch.cluster.service.ClusterManagerService; -import org.opensearch.cluster.service.ClusterManagerServiceTests; +import org.opensearch.cluster.service.MasterServiceTests; import org.opensearch.common.Randomness; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -514,7 +514,7 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception { ); assertTrue( - ClusterManagerServiceTests.discoveryState(clusterManagerService) + MasterServiceTests.discoveryState(clusterManagerService) .getVotingConfigExclusions() .stream() .anyMatch( @@ -746,7 +746,7 @@ public void testConcurrentJoining() { throw new RuntimeException(e); } - assertTrue(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); + assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); for (DiscoveryNode successfulNode : successfulNodes) { assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode)); assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode)); @@ -776,10 +776,10 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() { } private boolean isLocalNodeElectedMaster() { - return ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); + return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); } private boolean clusterStateHasNode(DiscoveryNode node) { - return node.equals(ClusterManagerServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); + return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); } } diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java new file mode 100644 index 0000000000000..ed1131a898ad9 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.junit.After; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; + +import static org.hamcrest.Matchers.equalTo; + +public class ClusterServiceTests extends OpenSearchTestCase { + private final TestThreadPool threadPool = new TestThreadPool(ClusterServiceTests.class.getName()); + + @After + public void terminateThreadPool() { + terminate(threadPool); + } + + public void testDeprecatedGetMasterServiceBWC() { + try ( + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ) + ) { + MasterService masterService = clusterService.getMasterService(); + ClusterManagerService clusterManagerService = clusterService.getClusterManagerService(); + assertThat(masterService, equalTo(clusterManagerService)); + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java similarity index 98% rename from server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java rename to server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 8a3c14bc3b1d3..6e8f001c51fdb 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -93,14 +93,14 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasKey; -public class ClusterManagerServiceTests extends OpenSearchTestCase { +public class MasterServiceTests extends OpenSearchTestCase { private static ThreadPool threadPool; private static long relativeTimeInMillis; @BeforeClass public static void createThreadPool() { - threadPool = new TestThreadPool(ClusterManagerServiceTests.class.getName()) { + threadPool = new TestThreadPool(MasterServiceTests.class.getName()) { @Override public long relativeTimeInMillis() { return relativeTimeInMillis; @@ -125,13 +125,13 @@ private ClusterManagerService createClusterManagerService(boolean makeClusterMan final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) .nodes( DiscoveryNodes.builder() .add(localNode) @@ -790,7 +790,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -805,9 +805,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { emptySet(), Version.CURRENT ); - final ClusterState initialClusterState = ClusterState.builder( - new ClusterName(ClusterManagerServiceTests.class.getSimpleName()) - ) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); @@ -975,7 +973,7 @@ public void testAcking() throws InterruptedException { try ( ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder() - .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), ClusterManagerServiceTests.class.getSimpleName()) + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -983,7 +981,7 @@ public void testAcking() throws InterruptedException { ) ) { - final ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterManagerServiceTests.class.getSimpleName())) + final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3).localNodeId(node1.getId()).masterNodeId(node1.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); From 56d1c5e2bc38ccd5ec27c16f4ab420cd4a47d7e6 Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Tue, 26 Jul 2022 22:20:56 -0700 Subject: [PATCH 7/7] Deprecate public methods createMasterService(...) in class ClusterServiceUtils Signed-off-by: Tianli Feng --- .../opensearch/test/ClusterServiceUtils.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index 523ce0ef5e799..8f4f510da5ec3 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.MasterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.Node; @@ -61,7 +62,7 @@ public class ClusterServiceUtils { - public static ClusterManagerService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { + public static ClusterManagerService createClusterManagerService(ThreadPool threadPool, ClusterState initialClusterState) { ClusterManagerService clusterManagerService = new ClusterManagerService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_cluster_manager_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -77,12 +78,24 @@ public static ClusterManagerService createMasterService(ThreadPool threadPool, C return clusterManagerService; } - public static ClusterManagerService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { + public static ClusterManagerService createClusterManagerService(ThreadPool threadPool, DiscoveryNode localNode) { ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId())) .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); - return createMasterService(threadPool, initialClusterState); + return createClusterManagerService(threadPool, initialClusterState); + } + + /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #createClusterManagerService(ThreadPool, ClusterState)} */ + @Deprecated + public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { + return createClusterManagerService(threadPool, initialClusterState); + } + + /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #createClusterManagerService(ThreadPool, DiscoveryNode)} */ + @Deprecated + public static MasterService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { + return createClusterManagerService(threadPool, localNode); } public static void setState(ClusterApplierService executor, ClusterState clusterState) {