Skip to content

Commit

Permalink
Revert renaming some variable name of MasterService
Browse files Browse the repository at this point in the history
Signed-off-by: Tianli Feng <[email protected]>
  • Loading branch information
Tianli Feng committed Jul 14, 2022
1 parent 84b1dfb commit b10e291
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final boolean singleNodeDiscovery;
private final ElectionStrategy electionStrategy;
private final TransportService transportService;
private final MasterService clusterManagerService;
private final MasterService masterService;
private final AllocationService allocationService;
private final JoinHelper joinHelper;
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
Expand Down Expand Up @@ -191,7 +191,7 @@ public Coordinator(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
AllocationService allocationService,
MasterService clusterManagerService,
MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier,
SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier,
Expand All @@ -203,15 +203,15 @@ public Coordinator(
) {
this.settings = settings;
this.transportService = transportService;
this.clusterManagerService = clusterManagerService;
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
clusterManagerService,
masterService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
Expand Down Expand Up @@ -260,7 +260,7 @@ public Coordinator(
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
clusterManagerService.setClusterStateSupplier(this::getStateForClusterManagerService);
masterService.setClusterStateSupplier(this::getStateForClusterManagerService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
Expand Down Expand Up @@ -310,7 +310,7 @@ private void onLeaderFailure(Exception e) {
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
clusterManagerService.submitStateUpdateTask(
masterService.submitStateUpdateTask(
"node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
Expand Down Expand Up @@ -756,7 +756,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
}

private void cleanClusterManagerService() {
clusterManagerService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
masterService.submitStateUpdateTask("clean-up after stepping down as cluster-manager", new LocalClusterUpdateTask() {
@Override
public void onFailure(String source, Exception e) {
// ignore
Expand Down Expand Up @@ -1126,7 +1126,7 @@ private void scheduleReconfigurationIfNeeded() {
final ClusterState state = getLastAcceptedState();
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
logger.trace("scheduling reconfiguration");
clusterManagerService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
reconfigurationTaskScheduled.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class JoinHelper {
Setting.Property.Deprecated
);

private final MasterService clusterManagerService;
private final MasterService masterService;
private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor;

Expand All @@ -122,7 +122,7 @@ public class JoinHelper {
JoinHelper(
Settings settings,
AllocationService allocationService,
MasterService clusterManagerService,
MasterService masterService,
TransportService transportService,
LongSupplier currentTermSupplier,
Supplier<ClusterState> currentStateSupplier,
Expand All @@ -132,7 +132,7 @@ public class JoinHelper {
RerouteService rerouteService,
NodeHealthService nodeHealthService
) {
this.clusterManagerService = clusterManagerService;
this.masterService = masterService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -454,7 +454,7 @@ class LeaderJoinAccumulator implements JoinAccumulator {
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
final JoinTaskExecutor.Task task = new JoinTaskExecutor.Task(sender, "join existing leader");
assert joinTaskExecutor != null;
clusterManagerService.submitStateUpdateTask(
masterService.submitStateUpdateTask(
"node-join",
task,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down Expand Up @@ -539,7 +539,7 @@ public void close(Mode newMode) {
pendingAsTasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source, e) -> {});
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {});
joinTaskExecutor = joinTaskExecutorGenerator.get();
clusterManagerService.submitStateUpdateTasks(
masterService.submitStateUpdateTasks(
stateUpdateSource,
pendingAsTasks,
ClusterStateTaskConfig.build(Priority.URGENT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
* @opensearch.internal
*/
public class ClusterService extends AbstractLifecycleComponent {
private final MasterService clusterManagerService;
private final MasterService masterService;

private final ClusterApplierService clusterApplierService;

Expand Down Expand Up @@ -100,12 +100,12 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
MasterService clusterManagerService,
MasterService masterService,
ClusterApplierService clusterApplierService
) {
this.settings = settings;
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterManagerService = clusterManagerService;
this.masterService = masterService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand All @@ -131,18 +131,18 @@ public RerouteService getRerouteService() {
@Override
protected synchronized void doStart() {
clusterApplierService.start();
clusterManagerService.start();
masterService.start();
}

@Override
protected synchronized void doStop() {
clusterManagerService.stop();
masterService.stop();
clusterApplierService.stop();
}

@Override
protected synchronized void doClose() {
clusterManagerService.close();
masterService.close();
clusterApplierService.close();
}

Expand Down Expand Up @@ -219,7 +219,7 @@ public void addLocalNodeMasterListener(LocalNodeClusterManagerListener listener)
}

public MasterService getMasterService() {
return clusterManagerService;
return masterService;
}

/**
Expand Down Expand Up @@ -333,6 +333,6 @@ public <T> void submitStateUpdateTasks(
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor
) {
clusterManagerService.submitStateUpdateTasks(source, tasks, config, executor);
masterService.submitStateUpdateTasks(source, tasks, config, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public DiscoveryModule(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService,
MasterService clusterManagerService,
MasterService masterService,
ClusterApplier clusterApplier,
ClusterSettings clusterSettings,
List<DiscoveryPlugin> plugins,
Expand Down Expand Up @@ -197,7 +197,7 @@ public DiscoveryModule(
transportService,
namedWriteableRegistry,
allocationService,
clusterManagerService,
masterService,
gatewayMetaState::getPersistedState,
seedHostsProvider,
clusterApplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
}
};

final MasterService clusterManagerService = new FakeThreadPoolMasterService(
final MasterService masterService = new FakeThreadPoolMasterService(
"test",
"clusterManagerService",
threadPool,
r -> { fail("cluster-manager service should not run any tasks"); }
);

final ClusterService clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService);
final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);

final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
Expand All @@ -103,8 +103,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {

clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService());
clusterApplierService.setInitialState(ClusterState.builder(new ClusterName("cluster")).nodes(noClusterManager).build());
clusterManagerService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish"));
clusterManagerService.setClusterStateSupplier(clusterApplierService::state);
masterService.setClusterStatePublisher((clusterChangedEvent, publishListener, ackListener) -> fail("should not publish"));
masterService.setClusterStateSupplier(clusterApplierService::state);
clusterService.start();

final AtomicBoolean becameClusterManager1 = new AtomicBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class NodeJoinTests extends OpenSearchTestCase {

private static ThreadPool threadPool;

private MasterService clusterManagerService;
private MasterService masterService;
private Coordinator coordinator;
private DeterministicTaskQueue deterministicTaskQueue;
private Transport transport;
Expand All @@ -117,7 +117,7 @@ public static void afterClass() {
@After
public void tearDown() throws Exception {
super.tearDown();
clusterManagerService.close();
masterService.close();
}

private static ClusterState initialState(DiscoveryNode localNode, long term, long version, VotingConfiguration config) {
Expand Down Expand Up @@ -166,40 +166,40 @@ private void setupFakeClusterManagerServiceAndCoordinator(long term, ClusterStat
}

private void setupRealClusterManagerServiceAndCoordinator(long term, ClusterState initialState) {
MasterService clusterManagerService = new MasterService(
MasterService masterService = new MasterService(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test_node").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialState);
clusterManagerService.setClusterStatePublisher((event, publishListener, ackListener) -> {
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
setupClusterManagerServiceAndCoordinator(
term,
initialState,
clusterManagerService,
masterService,
threadPool,
new Random(Randomness.get().nextLong()),
() -> new StatusInfo(HEALTHY, "healthy-info")
);
clusterManagerService.setClusterStateSupplier(clusterStateRef::get);
clusterManagerService.start();
masterService.setClusterStateSupplier(clusterStateRef::get);
masterService.start();
}

private void setupClusterManagerServiceAndCoordinator(
long term,
ClusterState initialState,
MasterService clusterManagerService,
MasterService masterService,
ThreadPool threadPool,
Random random,
NodeHealthService nodeHealthService
) {
if (this.clusterManagerService != null || coordinator != null) {
if (this.masterService != null || coordinator != null) {
throw new IllegalStateException("method setupClusterManagerServiceAndCoordinator can only be called once");
}
this.clusterManagerService = clusterManagerService;
this.masterService = masterService;
CapturingTransport capturingTransport = new CapturingTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
Expand Down Expand Up @@ -231,7 +231,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
transportService,
writableRegistry(),
OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY),
clusterManagerService,
masterService,
() -> new InMemoryPersistedState(term, initialState),
r -> emptyList(),
new NoOpClusterApplier(),
Expand Down Expand Up @@ -514,7 +514,7 @@ public void testJoinUpdateVotingConfigExclusion() throws Exception {
);

assertTrue(
MasterServiceTests.discoveryState(clusterManagerService)
MasterServiceTests.discoveryState(masterService)
.getVotingConfigExclusions()
.stream()
.anyMatch(
Expand Down Expand Up @@ -746,7 +746,7 @@ public void testConcurrentJoining() {
throw new RuntimeException(e);
}

assertTrue(MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster());
assertTrue(MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster());
for (DiscoveryNode successfulNode : successfulNodes) {
assertTrue(successfulNode + " joined cluster", clusterStateHasNode(successfulNode));
assertFalse(successfulNode + " voted for cluster-manager", coordinator.missingJoinVoteFrom(successfulNode));
Expand Down Expand Up @@ -776,10 +776,10 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() {
}

private boolean isLocalNodeElectedMaster() {
return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster();
return MasterServiceTests.discoveryState(masterService).nodes().isLocalNodeElectedMaster();
}

private boolean clusterStateHasNode(DiscoveryNode node) {
return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId()));
return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId()));
}
}
Loading

0 comments on commit b10e291

Please sign in to comment.