Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Introduce ClusterBootstrapService #35488

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -33,6 +34,8 @@
public class GetDiscoveredNodesRequest extends ActionRequest {

private int waitForNodes = 1;

@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);

public GetDiscoveredNodesRequest() {
Expand All @@ -41,7 +44,7 @@ public GetDiscoveredNodesRequest() {
public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
super(in);
waitForNodes = in.readInt();
timeout = in.readTimeValue();
timeout = in.readOptionalTimeValue();
}

/**
Expand Down Expand Up @@ -74,8 +77,8 @@ public int getWaitForNodes() {
*
* @param timeout how long to wait to discover sufficiently many nodes to respond successfully.
*/
public void setTimeout(TimeValue timeout) {
if (timeout.compareTo(TimeValue.ZERO) < 0) {
public void setTimeout(@Nullable TimeValue timeout) {
if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed");
}
this.timeout = timeout;
Expand All @@ -87,6 +90,7 @@ public void setTimeout(TimeValue timeout) {
*
* @return how long to wait to discover sufficiently many nodes to respond successfully.
*/
@Nullable
public TimeValue getTimeout() {
return timeout;
}
Expand All @@ -105,7 +109,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(waitForNodes);
out.writeTimeValue(timeout);
out.writeOptionalTimeValue(timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,20 @@ public String toString() {
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
respondIfRequestSatisfied.accept(coordinator.getFoundPeers());

threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
if (request.getTimeout() != null) {
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
}
}
}

@Override
public String toString() {
return "timeout handler for " + request;
}
});
@Override
public String toString() {
return "timeout handler for " + request;
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

public class ClusterBootstrapService {

private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class);

// The number of master-eligible nodes which, if discovered, can be used to bootstrap the cluster. This setting is unsafe in the event
// that more master nodes are started than expected.
public static final Setting<Integer> INITIAL_MASTER_NODE_COUNT_SETTING =
Setting.intSetting("cluster.unsafe_initial_master_node_count", 0, 0, Property.NodeScope);

private final int initialMasterNodeCount;
private final TransportService transportService;
private volatile boolean running;

public ClusterBootstrapService(Settings settings, TransportService transportService) {
initialMasterNodeCount = INITIAL_MASTER_NODE_COUNT_SETTING.get(settings);
this.transportService = transportService;
}

public void start() {
assert running == false;
running = true;

if (initialMasterNodeCount > 0 && transportService.getLocalNode().isMasterNode()) {
logger.debug("unsafely waiting for discovery of [{}] master-eligible nodes", initialMasterNodeCount);

final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setWaitForNodes(initialMasterNodeCount);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
assert response.getNodes().size() >= initialMasterNodeCount;
assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode);
logger.debug("discovered {}, starting to bootstrap", response.getNodes());
awaitBootstrap(response.getBootstrapConfiguration());
}

@Override
public void handleException(TransportException exp) {
logger.warn("discovery attempt failed", exp);
}

@Override
public String executor() {
return Names.SAME;
}

@Override
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
return new GetDiscoveredNodesResponse(in);
}
});
}
}
}

public void stop() {
assert running == true;
running = false;
}

private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) {
if (running == false) {
logger.debug("awaitBootstrap: not running");
return;
}

BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request,
new TransportResponseHandler<BootstrapClusterResponse>() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
logger.debug("automatic cluster bootstrapping successful: received {}", response);
}

@Override
public void handleException(TransportException exp) {
// log a warning since a failure here indicates a bad problem, such as:
// - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config)
// - discovered nodes no longer form a quorum in the bootstrap config
logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]",
bootstrapConfiguration.getNodeDescriptions()), exp);

// There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed
// since a tight loop here is unlikely to help.
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() {
@Override
public void run() {
awaitBootstrap(bootstrapConfiguration);
}

@Override
public String toString() {
return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions();
}
});
}

@Override
public String executor() {
return Names.SAME;
}

@Override
public BootstrapClusterResponse read(StreamInput in) throws IOException {
return new BootstrapClusterResponse(in);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private Releasable prevotingRound;
private long maxTermSeen;
private final Reconfigurator reconfigurator;
private final ClusterBootstrapService clusterBootstrapService;

private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
Expand Down Expand Up @@ -151,6 +152,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
}

private Runnable getOnLeaderFailure() {
Expand Down Expand Up @@ -483,11 +485,14 @@ public void startInitialJoin() {
synchronized (mutex) {
becomeCandidate("startInitialJoin");
}

clusterBootstrapService.start();
}

@Override
protected void doStop() {
configuredHostsResolver.stop();
clusterBootstrapService.stop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
import org.elasticsearch.cluster.coordination.JoinHelper;
Expand Down Expand Up @@ -459,7 +460,8 @@ public void apply(Settings value, Settings current, Settings previous) {
Coordinator.PUBLISH_TIMEOUT_SETTING,
JoinHelper.JOIN_TIMEOUT_SETTING,
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING
)));

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;

Expand Down Expand Up @@ -56,6 +57,9 @@ public void testTimeoutValidation() {
() -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1))));
assertThat(exception.getMessage(), startsWith("negative timeout of "));
assertThat(exception.getMessage(), endsWith(" is not allowed"));

getDiscoveredNodesRequest.setTimeout(null);
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue());
}

public void testSerialization() throws IOException {
Expand All @@ -67,6 +71,8 @@ public void testSerialization() throws IOException {

if (randomBoolean()) {
originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
} else if (randomBoolean()) {
originalRequest.setTimeout(null);
}

final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,34 +171,53 @@ public void handleException(TransportException exp) {
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

public void testFailsQuicklyWithZeroTimeout() throws InterruptedException {
public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();

final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(null);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}

final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
}

@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
countDownLatch.countDown();
}
});
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}

@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
countDownLatch.countDown();
}
});

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
}

public void testGetsDiscoveredNodes() throws InterruptedException {
Expand Down
Loading