diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 611261320a453..56d585d81e9dc 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final boolean singleNodeDiscovery; private final ElectionStrategy electionStrategy; private final TransportService transportService; - private final MasterService clusterManagerService; + private final MasterService masterService; private final AllocationService allocationService; private final JoinHelper joinHelper; private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @@ -191,7 +191,7 @@ public Coordinator( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, - MasterService clusterManagerService, + MasterService masterService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, @@ -203,7 +203,7 @@ public Coordinator( ) { this.settings = settings; this.transportService = transportService; - this.clusterManagerService = clusterManagerService; + this.masterService = masterService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings); @@ -211,7 +211,7 @@ public Coordinator( this.joinHelper = new JoinHelper( settings, allocationService, - clusterManagerService, + masterService, transportService, this::getCurrentTerm, this::getStateForClusterManagerService, @@ -260,7 +260,7 @@ public Coordinator( ); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); this.clusterApplier = clusterApplier; - clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService); + masterService.setClusterStateSupplier(this::getStateForClusterManagerService); this.reconfigurator = new Reconfigurator(settings, clusterSettings); this.clusterBootstrapService = new ClusterBootstrapService( settings, @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) { private void removeNode(DiscoveryNode discoveryNode, String reason) { synchronized (mutex) { if (mode == Mode.LEADER) { - clusterManagerService.submitStateUpdateTask( + masterService.submitStateUpdateTask( "node-left", new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason), ClusterStateTaskConfig.build(Priority.IMMEDIATE), @@ -756,7 +756,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { } private void cleanClusterManagerService() { - clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() { + masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() { @Override public void onFailure(String source, Exception e) { // ignore @@ -1126,7 +1126,7 @@ private void scheduleReconfigurationIfNeeded() { final ClusterState state = getLastAcceptedState(); if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) { logger.trace("scheduling reconfiguration"); - clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { + masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { reconfigurationTaskScheduled.set(false); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 18a9579547d56..3accc4f1d5baf 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -106,7 +106,7 @@ public class JoinHelper { Setting.Property.Deprecated ); - private final MasterService clusterManagerService; + private final MasterService masterService; private final TransportService transportService; private volatile JoinTaskExecutor joinTaskExecutor; @@ -122,7 +122,7 @@ public class JoinHelper { JoinHelper( Settings settings, AllocationService allocationService, - MasterService clusterManagerService, + MasterService masterService, TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, @@ -132,7 +132,7 @@ public class JoinHelper { RerouteService rerouteService, NodeHealthService nodeHealthService ) { - this.clusterManagerService = clusterManagerService; + this.masterService = masterService; this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); @@ -454,7 +454,7 @@ class LeaderJoinAccumulator implements JoinAccumulator { public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) { final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader"); assert joinTaskExecutor != null; - clusterManagerService.submitStateUpdateTask( + masterService.submitStateUpdateTask( "node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), @@ -539,7 +539,7 @@ public void close(Mode newMode) { pendingAsTasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source, e) -> {}); pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {}); joinTaskExecutor = joinTaskExecutorGenerator.get(); - clusterManagerService.submitStateUpdateTasks( + masterService.submitStateUpdateTasks( stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT), diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 9dc907f7afadd..baf453d18b3b3 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -62,7 +62,7 @@ * @opensearch.internal */ public class ClusterService extends AbstractLifecycleComponent { - private final MasterService clusterManagerService; + private final MasterService masterService; private final ClusterApplierService clusterApplierService; @@ -100,12 +100,12 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread public ClusterService( Settings settings, ClusterSettings clusterSettings, - MasterService clusterManagerService, + MasterService masterService, ClusterApplierService clusterApplierService ) { this.settings = settings; this.nodeName = Node.NODE_NAME_SETTING.get(settings); - this.clusterManagerService = clusterManagerService; + this.masterService = masterService; this.operationRouting = new OperationRouting(settings, clusterSettings); this.clusterSettings = clusterSettings; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); @@ -131,18 +131,18 @@ public RerouteService getRerouteService() { @Override protected synchronized void doStart() { clusterApplierService.start(); - clusterManagerService.start(); + masterService.start(); } @Override protected synchronized void doStop() { - clusterManagerService.stop(); + masterService.stop(); clusterApplierService.stop(); } @Override protected synchronized void doClose() { - clusterManagerService.close(); + masterService.close(); clusterApplierService.close(); } @@ -219,7 +219,7 @@ public void addLocalNodeMasterListener(LocalNodeClusterManagerListener listener) } public MasterService getMasterService() { - return clusterManagerService; + return masterService; } /** @@ -333,6 +333,6 @@ public void submitStateUpdateTasks( final ClusterStateTaskConfig config, final ClusterStateTaskExecutor executor ) { - clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor); + masterService.submitStateUpdateTasks(source, tasks, config, executor); } } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index c12283df1dbc9..6b746e5963bdc 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -121,7 +121,7 @@ public DiscoveryModule( TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, - MasterService clusterManagerService, + MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, List plugins, @@ -197,7 +197,7 @@ public DiscoveryModule( transportService, namedWriteableRegistry, allocationService, - clusterManagerService, + masterService, gatewayMetaState::getPersistedState, seedHostsProvider, clusterApplier, diff --git a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 1c247a7b5a3da..7f09f1eb8e2be 100644 --- a/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/opensearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -87,14 +87,14 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { } }; - final MasterService clusterManagerService = new FakeThreadPoolMasterService( + final MasterService masterService = new FakeThreadPoolMasterService( "test", "clusterManagerService", threadPool, r -> { fail("cluster-manager service should not run any tasks"); } ); - final ClusterService clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); + final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client); @@ -103,8 +103,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService()); clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("cluster")).nodes(noClusterManager).build()); - clusterManagerService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish")); - clusterManagerService.setClusterStateSupplier(clusterApplierService::state); + masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish")); + masterService.setClusterStateSupplier(clusterApplierService::state); clusterService.start(); final AtomicBoolean becameClusterManager1 = new AtomicBoolean(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index 61a81a8c08389..806468ed5e761 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -98,7 +98,7 @@ public class NodeJoinTests extends OpenSearchTestCase { private static ThreadPool threadPool; - private MasterService clusterManagerService; + private MasterService masterService; private Coordinator coordinator; private DeterministicTaskQueue deterministicTaskQueue; private Transport transport; @@ -117,7 +117,7 @@ public static void afterClass() { @After public void tearDown() throws Exception { super.tearDown(); - clusterManagerService.close(); + masterService.close(); } private static ClusterState initialState(DiscoveryNode localNode, long term, long version, VotingConfiguration config) { @@ -166,40 +166,40 @@ private void setupFakeClusterManagerServiceAndCoordinator(long term, ClusterStat } private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterState initialState) { - MasterService clusterManagerService = new MasterService( + MasterService masterService = new MasterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); AtomicReference clusterStateRef = new AtomicReference<>(initialState); - clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); setupClusterManagerServiceAndCoordinator( term, initialState, - clusterManagerService, + masterService, threadPool, new Random(Randomness.get().nextLong()), () -> new StatusInfo(HEALTHY, "healthy-info") ); - clusterManagerService.setClusterStateSupplier(clusterStateRef::get); - clusterManagerService.start(); + masterService.setClusterStateSupplier(clusterStateRef::get); + masterService.start(); } private void setupClusterManagerServiceAndCoordinator( long term, ClusterState initialState, - MasterService clusterManagerService, + MasterService masterService, ThreadPool threadPool, Random random, NodeHealthService nodeHealthService ) { - if (this.clusterManagerService != null || coordinator != null) { + if (this.masterService != null || coordinator != null) { throw new IllegalStateException("method setupClusterManagerServiceAndCoordinator can only be called once"); } - this.clusterManagerService = clusterManagerService; + this.masterService = masterService; CapturingTransport capturingTransport = new CapturingTransport() { @Override protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) { @@ -231,7 +231,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService, writableRegistry(), OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY), - clusterManagerService, + masterService, () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), @@ -514,7 +514,7 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception { ); assertTrue( - MasterServiceTests.discoveryState(clusterManagerService) + MasterServiceTests.discoveryState(masterService) .getVotingConfigExclusions() .stream() .anyMatch( @@ -746,7 +746,7 @@ public void testConcurrentJoining() { throw new RuntimeException(e); } - assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster()); + assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster()); for (DiscoveryNode successfulNode : successfulNodes) { assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode)); assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode)); @@ -776,10 +776,10 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() { } private boolean isLocalNodeElectedMaster() { - return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); + return MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster(); } private boolean clusterStateHasNode(DiscoveryNode node) { - return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); + return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId())); } } diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index d5f7344c544b9..c829bef576be5 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -123,7 +123,7 @@ public void randomizeCurrentTime() { private MasterService createClusterManagerService(boolean makeClusterManager) { final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - final MasterService clusterManagerService = new MasterService( + final MasterService masterService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -141,13 +141,13 @@ private MasterService createClusterManagerService(boolean makeClusterManager) { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - clusterManagerService.setClusterStateSupplier(clusterStateRef::get); - clusterManagerService.start(); - return clusterManagerService; + masterService.setClusterStateSupplier(clusterStateRef::get); + masterService.start(); + return masterService; } public void testClusterManagerAwareExecution() throws Exception { @@ -194,7 +194,7 @@ public void onFailure(String source, Exception e) { } public void testThreadContext() throws InterruptedException { - final MasterService clusterManager = createClusterManagerService(true); + final MasterService masterService = createClusterManagerService(true); final CountDownLatch latch = new CountDownLatch(1); try (ThreadContext.StoredContext ignored = threadPool.getThreadContext().stashContext()) { @@ -208,7 +208,7 @@ public void testThreadContext() throws InterruptedException { final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); final TimeValue clusterManagerTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); - clusterManager.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + masterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { assertTrue(threadPool.getThreadContext().isSystemContext()); @@ -280,7 +280,7 @@ public void onAckTimeout() { latch.await(); - clusterManager.close(); + masterService.close(); } /* @@ -292,8 +292,8 @@ public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws Interru final CountDownLatch latch = new CountDownLatch(1); AtomicBoolean published = new AtomicBoolean(); - try (MasterService clusterManagerService = createClusterManagerService(true)) { - clusterManagerService.submitStateUpdateTask( + try (MasterService masterService = createClusterManagerService(true)) { + masterService.submitStateUpdateTask( "testClusterStateTaskListenerThrowingExceptionIsOkay", new Object(), ClusterStateTaskConfig.build(Priority.NORMAL), @@ -421,8 +421,8 @@ public void testClusterStateUpdateLogging() throws Exception { ) ); - try (MasterService clusterManagerService = createClusterManagerService(true)) { - clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + try (MasterService masterService = createClusterManagerService(true)) { + masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis(); @@ -437,7 +437,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis(); @@ -452,7 +452,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS @Override public void onFailure(String source, Exception e) {} }); - clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis(); @@ -469,7 +469,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return currentState; @@ -617,7 +617,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }; - try (MasterService clusterManagerService = createClusterManagerService(true)) { + try (MasterService masterService = createClusterManagerService(true)) { final ConcurrentMap submittedTasksPerThread = new ConcurrentHashMap<>(); CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); for (int i = 0; i < numberOfThreads; i++) { @@ -632,7 +632,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS submittedTasksPerThread.computeIfAbsent(threadName, key -> new AtomicInteger()).addAndGet(tasks.size()); final TaskExecutor executor = assignment.v1(); if (tasks.size() == 1) { - clusterManagerService.submitStateUpdateTask( + masterService.submitStateUpdateTask( threadName, tasks.stream().findFirst().get(), ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -642,7 +642,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } else { Map taskListeners = new HashMap<>(); tasks.forEach(t -> taskListeners.put(t, listener)); - clusterManagerService.submitStateUpdateTasks( + masterService.submitStateUpdateTasks( threadName, taskListeners, ClusterStateTaskConfig.build(randomFrom(Priority.values())), @@ -696,8 +696,8 @@ public void testBlockingCallInClusterStateTaskListenerFails() throws Interrupted final CountDownLatch latch = new CountDownLatch(1); final AtomicReference assertionRef = new AtomicReference<>(); - try (MasterService clusterManagerService = createClusterManagerService(true)) { - clusterManagerService.submitStateUpdateTask( + try (MasterService masterService = createClusterManagerService(true)) { + masterService.submitStateUpdateTask( "testBlockingCallInClusterStateTaskListenerFails", new Object(), ClusterStateTaskConfig.build(Priority.NORMAL), @@ -788,7 +788,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { ); try ( - MasterService clusterManagerService = new MasterService( + MasterService masterService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -810,7 +810,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { if (event.source().contains("test5")) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( Settings.EMPTY @@ -825,12 +825,12 @@ public void testLongClusterStateUpdateLogging() throws Exception { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - clusterManagerService.setClusterStateSupplier(clusterStateRef::get); - clusterManagerService.start(); + masterService.setClusterStateSupplier(clusterStateRef::get); + masterService.start(); final CountDownLatch latch = new CountDownLatch(6); final CountDownLatch processedFirstTask = new CountDownLatch(1); - clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += randomLongBetween( @@ -853,7 +853,7 @@ public void onFailure(String source, Exception e) { }); processedFirstTask.await(); - clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -872,7 +872,7 @@ public void onFailure(String source, Exception e) { latch.countDown(); } }); - clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -891,7 +891,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get( @@ -910,7 +910,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - clusterManagerService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).incrementVersion().build(); @@ -926,7 +926,7 @@ public void onFailure(String source, Exception e) { fail(); } }); - clusterManagerService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).incrementVersion().build(); @@ -944,7 +944,7 @@ public void onFailure(String source, Exception e) { }); // Additional update task to make sure all previous logging made it to the loggerName // We don't check logging for this on since there is no guarantee that it will occur before our check - clusterManagerService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return currentState; @@ -971,7 +971,7 @@ public void testAcking() throws InterruptedException { final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); try ( - MasterService clusterManagerService = new MasterService( + MasterService masterService = new MasterService( Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") @@ -986,9 +986,9 @@ public void testAcking() throws InterruptedException { .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) .build(); final AtomicReference publisherRef = new AtomicReference<>(); - clusterManagerService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al)); - clusterManagerService.setClusterStateSupplier(() -> initialClusterState); - clusterManagerService.start(); + masterService.setClusterStatePublisher((e, pl, al) -> publisherRef.get().publish(e, pl, al)); + masterService.setClusterStateSupplier(() -> initialClusterState); + masterService.start(); // check that we don't time out before even committing the cluster state { @@ -1000,7 +1000,7 @@ public void testAcking() throws InterruptedException { ) ); - clusterManagerService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -1055,7 +1055,7 @@ public void onAckTimeout() { ackListener.onNodeAck(node3, null); }); - clusterManagerService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -1101,8 +1101,8 @@ public void onAckTimeout() { /** * Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer) */ - public static ClusterState discoveryState(MasterService clusterManagerService) { - return clusterManagerService.state(); + public static ClusterState discoveryState(MasterService masterService) { + return masterService.state(); } } diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index efcefab6c9f8b..d1e3f406b4933 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -70,7 +70,7 @@ public class DiscoveryModuleTests extends OpenSearchTestCase { private TransportService transportService; private NamedWriteableRegistry namedWriteableRegistry; - private MasterService clusterManagerService; + private MasterService masterService; private ClusterApplier clusterApplier; private ThreadPool threadPool; private ClusterSettings clusterSettings; @@ -93,7 +93,7 @@ public void setupDummyServices() { threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null); - clusterManagerService = mock(MasterService.class); + masterService = mock(MasterService.class); namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); clusterApplier = mock(ClusterApplier.class); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -112,7 +112,7 @@ private DiscoveryModule newModule(Settings settings, List plugi transportService, namedWriteableRegistry, null, - clusterManagerService, + masterService, clusterApplier, clusterSettings, plugins, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9558e898f8832..7e0f977e6e835 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1643,7 +1643,7 @@ private final class TestClusterNode { private final DiscoveryNode node; - private final MasterService clusterManagerService; + private final MasterService masterService; private final AllocationService allocationService; @@ -1663,18 +1663,13 @@ private final class TestClusterNode { this.node = node; final Environment environment = createEnvironment(node.getName()); threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)); - clusterManagerService = new FakeThreadPoolMasterService( - node.getName(), - "test", - threadPool, - deterministicTaskQueue::scheduleNow - ); + masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow); final Settings settings = environment.settings(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterService = new ClusterService( settings, clusterSettings, - clusterManagerService, + masterService, new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { @Override protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @@ -2205,7 +2200,7 @@ public void start(ClusterState initialState) { transportService, namedWriteableRegistry, allocationService, - clusterManagerService, + masterService, () -> persistedState, hostsResolver -> nodes.values() .stream() @@ -2219,7 +2214,7 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info") ); - clusterManagerService.setClusterStatePublisher(coordinator); + masterService.setClusterStatePublisher(coordinator); coordinator.start(); clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService); nodeConnectionsService.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index fb18aa1b5749b..19551b0adecb2 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -557,7 +557,7 @@ void stabilise(long stabilisationDurationMillis) { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); - final int pendingTaskCount = leader.clusterManagerService.getFakeMasterServicePendingTaskCount(); + final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount(); runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue"); final Matcher isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); @@ -1025,7 +1025,7 @@ class ClusterNode { private final DiscoveryNode localNode; final MockPersistedState persistedState; final Settings nodeSettings; - private AckedFakeThreadPoolMasterService clusterManagerService; + private AckedFakeThreadPoolMasterService masterService; private DisruptableClusterApplierService clusterApplierService; private ClusterService clusterService; TransportService transportService; @@ -1105,7 +1105,7 @@ protected Optional getDisruptableMockTransport(Transpo null, emptySet() ); - clusterManagerService = new AckedFakeThreadPoolMasterService( + masterService = new AckedFakeThreadPoolMasterService( localNode.getId(), "test", threadPool, @@ -1119,7 +1119,7 @@ protected Optional getDisruptableMockTransport(Transpo deterministicTaskQueue, threadPool ); - clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService); + clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); clusterService.setNodeConnectionsService( new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService) ); @@ -1134,7 +1134,7 @@ protected Optional getDisruptableMockTransport(Transpo transportService, writableRegistry(), allocationService, - clusterManagerService, + masterService, this::getPersistedState, Cluster.this::provideSeedHosts, clusterApplierService, @@ -1144,7 +1144,7 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService ); - clusterManagerService.setClusterStatePublisher(coordinator); + masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService( settings, allocationService, @@ -1331,11 +1331,11 @@ AckCollector submitUpdateTask( onNode(() -> { logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source); final long submittedTerm = coordinator.getCurrentTerm(); - clusterManagerService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { assertThat(currentState.term(), greaterThanOrEqualTo(submittedTerm)); - clusterManagerService.nextAckCollector = ackCollector; + masterService.nextAckCollector = ackCollector; return clusterStateUpdate.apply(currentState); } diff --git a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java index f709d8bcaff34..99f9b86fb6479 100644 --- a/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/ClusterServiceUtils.java @@ -62,19 +62,19 @@ public class ClusterServiceUtils { public static MasterService createMasterService(ThreadPool threadPool, ClusterState initialClusterState) { - MasterService clusterManagerService = new MasterService( + MasterService masterService = new MasterService( Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_cluster_manager_node").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); AtomicReference clusterStateRef = new AtomicReference<>(initialClusterState); - clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { clusterStateRef.set(event.state()); publishListener.onResponse(null); }); - clusterManagerService.setClusterStateSupplier(clusterStateRef::get); - clusterManagerService.start(); - return clusterManagerService; + masterService.setClusterStateSupplier(clusterStateRef::get); + masterService.start(); + return masterService; } public static MasterService createMasterService(ThreadPool threadPool, DiscoveryNode localNode) { diff --git a/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java index dabeeb6742ec4..b4cdb0b33391a 100644 --- a/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/opensearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -84,21 +84,21 @@ public void testFakeClusterManagerService() { doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any()); when(mockThreadPool.generic()).thenReturn(executorService); - FakeThreadPoolMasterService clusterManagerService = new FakeThreadPoolMasterService( + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService( "test_node", "test", mockThreadPool, runnableTasks::add ); - clusterManagerService.setClusterStateSupplier(lastClusterStateRef::get); - clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> { + masterService.setClusterStateSupplier(lastClusterStateRef::get); + masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { lastClusterStateRef.set(event.state()); publishingCallback.set(publishListener); }); - clusterManagerService.start(); + masterService.start(); AtomicBoolean firstTaskCompleted = new AtomicBoolean(); - clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState) @@ -138,7 +138,7 @@ public void onFailure(String source, Exception e) { assertThat(runnableTasks.size(), equalTo(0)); AtomicBoolean secondTaskCompleted = new AtomicBoolean(); - clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { + masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState)