From 9f7e95161db8f035762124e5c0e44ef2e42bb209 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 14 Nov 2018 16:20:16 +0000 Subject: [PATCH] Wait indefinitely, do not retry --- .../bootstrap/GetDiscoveredNodesRequest.java | 12 ++-- .../TransportGetDiscoveredNodesAction.java | 24 ++++---- .../coordination/ClusterBootstrapService.java | 9 +-- .../GetDiscoveredNodesRequestTests.java | 6 ++ ...ransportGetDiscoveredNodesActionTests.java | 57 ++++++++++++------- .../ClusterBootstrapServiceTests.java | 38 ------------- 6 files changed, 67 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java index f81061e854759..ffd031785cf4b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequest.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -33,6 +34,8 @@ public class GetDiscoveredNodesRequest extends ActionRequest { private int waitForNodes = 1; + + @Nullable // if the request should wait indefinitely private TimeValue timeout = TimeValue.timeValueSeconds(30); public GetDiscoveredNodesRequest() { @@ -41,7 +44,7 @@ public GetDiscoveredNodesRequest() { public GetDiscoveredNodesRequest(StreamInput in) throws IOException { super(in); waitForNodes = in.readInt(); - timeout = in.readTimeValue(); + timeout = in.readOptionalTimeValue(); } /** @@ -74,8 +77,8 @@ public int getWaitForNodes() { * * @param timeout how long to wait to discover sufficiently many nodes to respond successfully. */ - public void setTimeout(TimeValue timeout) { - if (timeout.compareTo(TimeValue.ZERO) < 0) { + public void setTimeout(@Nullable TimeValue timeout) { + if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) { throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed"); } this.timeout = timeout; @@ -87,6 +90,7 @@ public void setTimeout(TimeValue timeout) { * * @return how long to wait to discover sufficiently many nodes to respond successfully. */ + @Nullable public TimeValue getTimeout() { return timeout; } @@ -105,7 +109,7 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeInt(waitForNodes); - out.writeTimeValue(timeout); + out.writeOptionalTimeValue(timeout); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java index a45d7c3246fbc..c4d979300fa37 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesAction.java @@ -108,18 +108,20 @@ public String toString() { listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext()); respondIfRequestSatisfied.accept(coordinator.getFoundPeers()); - threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() { - @Override - public void run() { - if (listenerNotified.compareAndSet(false, true)) { - listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request)); + if (request.getTimeout() != null) { + threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() { + @Override + public void run() { + if (listenerNotified.compareAndSet(false, true)) { + listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request)); + } } - } - @Override - public String toString() { - return "timeout handler for " + request; - } - }); + @Override + public String toString() { + return "timeout handler for " + request; + } + }); + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index afaa3387c6549..8804050b66116 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -89,6 +89,7 @@ private void awaitDiscovery() { final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest(); request.setWaitForNodes(initialMasterNodeCount); + request.setTimeout(null); logger.trace("sending {}", request); transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request, new TransportResponseHandler() { @@ -103,13 +104,7 @@ public void handleResponse(GetDiscoveredNodesResponse response) { @Override public void handleException(TransportException exp) { - if (exp.getRootCause() instanceof ElasticsearchTimeoutException) { - logger.debug(new ParameterizedMessage("discovery attempt timed out, retrying, request={}", request), exp); - awaitDiscovery(); - } else { - // exceptions other than a timeout are fatal - logger.warn("discovery attempt failed, not retrying", exp); - } + logger.warn("discovery attempt failed", exp); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java index 9ce53a93efaff..f15e0af1740f6 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/GetDiscoveredNodesRequestTests.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.Is.is; @@ -56,6 +57,9 @@ public void testTimeoutValidation() { () -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1)))); assertThat(exception.getMessage(), startsWith("negative timeout of ")); assertThat(exception.getMessage(), endsWith(" is not allowed")); + + getDiscoveredNodesRequest.setTimeout(null); + assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue()); } public void testSerialization() throws IOException { @@ -67,6 +71,8 @@ public void testSerialization() throws IOException { if (randomBoolean()) { originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout")); + } else if (randomBoolean()) { + originalRequest.setTimeout(null); } final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index 1a8ef39fdf8e7..ba663e76b4605 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -171,34 +171,53 @@ public void handleException(TransportException exp) { assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); } - public void testFailsQuicklyWithZeroTimeout() throws InterruptedException { + public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException { new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action transportService.start(); transportService.acceptIncomingRequests(); coordinator.start(); coordinator.startInitialJoin(); - final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); - getDiscoveredNodesRequest.setWaitForNodes(2); - getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); + { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(null); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } - final CountDownLatch countDownLatch = new CountDownLatch(1); - transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { - @Override - public void handleResponse(GetDiscoveredNodesResponse response) { - throw new AssertionError("should not be called"); - } + @Override + public void handleException(TransportException exp) { + throw new AssertionError("should not be called", exp); + } + }); + } - @Override - public void handleException(TransportException exp) { - final Throwable rootCause = exp.getRootCause(); - assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); - assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); - countDownLatch.countDown(); - } - }); + { + final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest(); + getDiscoveredNodesRequest.setWaitForNodes(2); + getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO); - assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + final CountDownLatch countDownLatch = new CountDownLatch(1); + transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() { + @Override + public void handleResponse(GetDiscoveredNodesResponse response) { + throw new AssertionError("should not be called"); + } + + @Override + public void handleException(TransportException exp) { + final Throwable rootCause = exp.getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{")); + countDownLatch.countDown(); + } + }); + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } } public void testGetsDiscoveredNodes() throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java index 2d563d203093e..fb87e26ab096c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/ClusterBootstrapServiceTests.java @@ -146,44 +146,6 @@ public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel deterministicTaskQueue.runAllTasks(); } - public void testRetriesOnDiscoveryTimeout() { - AtomicLong callCount = new AtomicLong(); - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 30000, () -> { - callCount.incrementAndGet(); - try { - channel.sendResponse(new ElasticsearchTimeoutException("simulated timeout")); - } catch (IOException e) { - throw new AssertionError("unexpected", e); - } - })); - - startServices(); - while (callCount.get() < 5) { - if (deterministicTaskQueue.hasDeferredTasks()) { - deterministicTaskQueue.advanceTime(); - } - deterministicTaskQueue.runAllRunnableTasks(); - } - } - - public void testStopsRetryingDiscoveryWhenStopped() { - transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new, - (request, channel, task) -> deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 30000, () -> { - try { - channel.sendResponse(new ElasticsearchTimeoutException("simulated timeout")); - } catch (IOException e) { - throw new AssertionError("unexpected", e); - } - })); - - scheduleStopAfter(150000); - - startServices(); - deterministicTaskQueue.runAllTasks(); - // termination means success - } - public void testBootstrapsOnDiscoverySuccess() { final AtomicBoolean discoveryAttempted = new AtomicBoolean(); final Set discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());