Skip to content

Commit

Permalink
remove zen discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Feb 27, 2019
1 parent 756dc42 commit 5e4a0e7
Show file tree
Hide file tree
Showing 35 changed files with 12 additions and 8,682 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -33,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.zen.NodesFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -114,10 +111,6 @@ public FollowersChecker(Settings settings, TransportService transportService,
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(FOLLOWER_CHECK_ACTION_NAME, Names.SAME, false, false, FollowerCheckRequest::new,
(request, transportChannel, task) -> handleFollowerCheck(request, transportChannel));
transportService.registerRequestHandler(
NodesFaultDetection.PING_ACTION_NAME, NodesFaultDetection.PingRequest::new, Names.SAME, false, false,
(request, channel, task) -> // TODO: check that we're a follower of the requesting node?
channel.sendResponse(new NodesFaultDetection.PingResponse()));
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
Expand Down Expand Up @@ -304,17 +297,7 @@ private void handleWakeUp() {
final FollowerCheckRequest request = new FollowerCheckRequest(fastResponseState.term, transportService.getLocalNode());
logger.trace("handleWakeUp: checking {} with {}", discoveryNode, request);

final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(discoveryNode)) {
actionName = NodesFaultDetection.PING_ACTION_NAME;
transportRequest = new NodesFaultDetection.PingRequest(discoveryNode, ClusterName.CLUSTER_NAME_SETTING.get(settings),
transportService.getLocalNode(), ClusterState.UNKNOWN_VERSION);
} else {
actionName = FOLLOWER_CHECK_ACTION_NAME;
transportRequest = request;
}
transportService.sendRequest(discoveryNode, actionName, transportRequest,
transportService.sendRequest(discoveryNode, FOLLOWER_CHECK_ACTION_NAME, request,
TransportRequestOptions.builder().withTimeout(followerCheckTimeout).withType(Type.PING).build(),
new TransportResponseHandler<Empty>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
Expand Down Expand Up @@ -117,11 +115,6 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
transportService.registerRequestHandler(JOIN_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, JoinRequest::new,
(request, channel, task) -> joinHandler.accept(request, transportJoinCallback(request, channel)));

transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_ACTION_NAME, MembershipAction.JoinRequest::new,
ThreadPool.Names.GENERIC, false, false,
(request, channel, task) -> joinHandler.accept(new JoinRequest(request.getNode(), Optional.empty()), // treat as non-voting join
transportJoinCallback(request, channel)));

transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
(request, channel, task) -> {
Expand All @@ -143,21 +136,6 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});

transportService.registerRequestHandler(MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});

transportService.registerRequestHandler(
ZenDiscovery.DISCOVERY_REJOIN_ACTION_NAME, ZenDiscovery.RejoinClusterRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?

transportService.registerRequestHandler(
MembershipAction.DISCOVERY_LEAVE_ACTION_NAME, MembershipAction.LeaveRequest::new, ThreadPool.Names.SAME,
(request, channel, task) -> channel.sendResponse(Empty.INSTANCE)); // TODO: do we need to implement anything here?
}

private JoinCallback transportJoinCallback(TransportRequest request, TransportChannel channel) {
Expand Down Expand Up @@ -200,16 +178,7 @@ public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJo
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME;
transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode());
} else {
actionName = JOIN_ACTION_NAME;
transportRequest = joinRequest;
}
transportService.sendRequest(destination, actionName, transportRequest,
transportService.sendRequest(destination, JOIN_ACTION_NAME, joinRequest,
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new TransportResponseHandler<Empty>() {
@Override
Expand Down Expand Up @@ -269,13 +238,7 @@ public String executor() {
}

public void sendValidateJoinRequest(DiscoveryNode node, ClusterState state, ActionListener<TransportResponse.Empty> listener) {
final String actionName;
if (Coordinator.isZen1Node(node)) {
actionName = MembershipAction.DISCOVERY_JOIN_VALIDATE_ACTION_NAME;
} else {
actionName = VALIDATE_JOIN_ACTION_NAME;
}
transportService.sendRequest(node, actionName,
transportService.sendRequest(node, VALIDATE_JOIN_ACTION_NAME,
new ValidateJoinRequest(state),
TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.ArrayList;
Expand All @@ -47,8 +46,6 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut

private final Logger logger;

private final int minimumMasterNodesOnLocalNode;

public static class Task {

private final DiscoveryNode node;
Expand Down Expand Up @@ -87,7 +84,6 @@ public boolean isFinishElectionTask() {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
}

@Override
Expand Down Expand Up @@ -191,7 +187,6 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
.blocks(currentState.blocks())
.removeGlobalBlock(NoMasterBlockService.NO_MASTER_BLOCK_ID))
.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnLocalNode)
.build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
allocationService.cleanCaches();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
Expand All @@ -32,7 +31,6 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.MasterFaultDetection;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportConnectionListener;
Expand Down Expand Up @@ -103,16 +101,6 @@ public LeaderChecker(final Settings settings, final TransportService transportSe
channel.sendResponse(Empty.INSTANCE);
});

transportService.registerRequestHandler(MasterFaultDetection.MASTER_PING_ACTION_NAME, MasterFaultDetection.MasterPingRequest::new,
Names.SAME, false, false, (request, channel, task) -> {
try {
handleLeaderCheck(new LeaderCheckRequest(request.sourceNode));
} catch (CoordinationStateRejectedException e) {
throw new MasterFaultDetection.ThisIsNotTheMasterYouAreLookingForException(e.getMessage());
}
channel.sendResponse(new MasterFaultDetection.MasterPingResponseResponse());
});

transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
Expand Down Expand Up @@ -217,21 +205,7 @@ void handleWakeUp() {

logger.trace("checking {} with [{}] = {}", leader, LEADER_CHECK_TIMEOUT_SETTING.getKey(), leaderCheckTimeout);

final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(leader)) {
actionName = MasterFaultDetection.MASTER_PING_ACTION_NAME;
transportRequest = new MasterFaultDetection.MasterPingRequest(
transportService.getLocalNode(), leader, ClusterName.CLUSTER_NAME_SETTING.get(settings));
} else {
actionName = LEADER_CHECK_ACTION_NAME;
transportRequest = new LeaderCheckRequest(transportService.getLocalNode());
}
// TODO lag detection:
// In the PoC, the leader sent its current version to the follower in the response to a LeaderCheck, so the follower
// could detect if it was lagging. We'd prefer this to be implemented on the leader, so the response is just
// TransportResponse.Empty here.
transportService.sendRequest(leader, actionName, transportRequest,
transportService.sendRequest(leader, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(transportService.getLocalNode()),
TransportRequestOptions.builder().withTimeout(leaderCheckTimeout).withType(Type.PING).build(),

new TransportResponseHandler<TransportResponse.Empty>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
Expand All @@ -54,7 +52,6 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -194,16 +191,7 @@ public String toString() {
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
actionName = PublishClusterStateAction.COMMIT_ACTION_NAME;
transportRequest = new PublishClusterStateAction.CommitClusterStateRequest(newState.stateUUID());
} else {
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions,
new TransportResponseHandler<TransportResponse.Empty>() {

@Override
Expand Down Expand Up @@ -267,19 +255,7 @@ public String executor() {
return ThreadPool.Names.GENERIC;
}
};
final String actionName;
final TransportResponseHandler<?> transportResponseHandler;
if (Coordinator.isZen1Node(node)) {
actionName = PublishClusterStateAction.SEND_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler.wrap(empty -> new PublishWithJoinResponse(
new PublishResponse(clusterState.term(), clusterState.version()),
Optional.of(new Join(node, transportService.getLocalNode(), clusterState.term(), clusterState.term(),
clusterState.version()))), in -> TransportResponse.Empty.INSTANCE);
} else {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler;
}
transportService.sendRequest(node, actionName, request, stateRequestOptions, transportResponseHandler);
transportService.sendRequest(node, PUBLISH_STATE_ACTION_NAME, request, stateRequestOptions, publishWithJoinResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
Expand Down Expand Up @@ -291,7 +288,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_META_DATA,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterAware.SEARCH_REMOTE_CLUSTERS_SEEDS,
Expand Down Expand Up @@ -390,20 +386,6 @@ public void apply(Settings value, Settings current, Settings previous) {
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
DiscoveryModule.LEGACY_DISCOVERY_HOSTS_PROVIDER_SETTING,
FaultDetection.PING_RETRIES_SETTING,
FaultDetection.PING_TIMEOUT_SETTING,
FaultDetection.REGISTER_CONNECTION_LISTENER_SETTING,
FaultDetection.PING_INTERVAL_SETTING,
FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING,
ZenDiscovery.PING_TIMEOUT_SETTING,
ZenDiscovery.JOIN_TIMEOUT_SETTING,
ZenDiscovery.JOIN_RETRY_ATTEMPTS_SETTING,
ZenDiscovery.JOIN_RETRY_DELAY_SETTING,
ZenDiscovery.MAX_PINGS_FROM_ANOTHER_MASTER_SETTING,
ZenDiscovery.SEND_LEAVE_REQUEST_SETTING,
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING,
SettingsBasedSeedHostsProvider.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
SeedHostsResolver.DISCOVERY_SEED_RESOLVER_MAX_CONCURRENT_RESOLVERS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.discovery;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -37,7 +37,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -128,9 +127,6 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
};

Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put(ZEN_DISCOVERY_TYPE,
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState));
discoveryTypes.put(ZEN2_DISCOVERY_TYPE, () -> new Coordinator(NODE_NAME_SETTING.get(settings), settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider, clusterApplier,
Expand Down
Loading

0 comments on commit 5e4a0e7

Please sign in to comment.