Skip to content

Commit

Permalink
Wait indefinitely, do not retry
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 14, 2018
1 parent 150f47b commit 9f7e951
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -33,6 +34,8 @@
public class GetDiscoveredNodesRequest extends ActionRequest {

private int waitForNodes = 1;

@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);

public GetDiscoveredNodesRequest() {
Expand All @@ -41,7 +44,7 @@ public GetDiscoveredNodesRequest() {
public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
super(in);
waitForNodes = in.readInt();
timeout = in.readTimeValue();
timeout = in.readOptionalTimeValue();
}

/**
Expand Down Expand Up @@ -74,8 +77,8 @@ public int getWaitForNodes() {
*
* @param timeout how long to wait to discover sufficiently many nodes to respond successfully.
*/
public void setTimeout(TimeValue timeout) {
if (timeout.compareTo(TimeValue.ZERO) < 0) {
public void setTimeout(@Nullable TimeValue timeout) {
if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed");
}
this.timeout = timeout;
Expand All @@ -87,6 +90,7 @@ public void setTimeout(TimeValue timeout) {
*
* @return how long to wait to discover sufficiently many nodes to respond successfully.
*/
@Nullable
public TimeValue getTimeout() {
return timeout;
}
Expand All @@ -105,7 +109,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(waitForNodes);
out.writeTimeValue(timeout);
out.writeOptionalTimeValue(timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,20 @@ public String toString() {
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
respondIfRequestSatisfied.accept(coordinator.getFoundPeers());

threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
if (request.getTimeout() != null) {
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
}
}
}

@Override
public String toString() {
return "timeout handler for " + request;
}
});
@Override
public String toString() {
return "timeout handler for " + request;
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private void awaitDiscovery() {

final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setWaitForNodes(initialMasterNodeCount);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
Expand All @@ -103,13 +104,7 @@ public void handleResponse(GetDiscoveredNodesResponse response) {

@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof ElasticsearchTimeoutException) {
logger.debug(new ParameterizedMessage("discovery attempt timed out, retrying, request={}", request), exp);
awaitDiscovery();
} else {
// exceptions other than a timeout are fatal
logger.warn("discovery attempt failed, not retrying", exp);
}
logger.warn("discovery attempt failed", exp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;

Expand Down Expand Up @@ -56,6 +57,9 @@ public void testTimeoutValidation() {
() -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1))));
assertThat(exception.getMessage(), startsWith("negative timeout of "));
assertThat(exception.getMessage(), endsWith(" is not allowed"));

getDiscoveredNodesRequest.setTimeout(null);
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue());
}

public void testSerialization() throws IOException {
Expand All @@ -67,6 +71,8 @@ public void testSerialization() throws IOException {

if (randomBoolean()) {
originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
} else if (randomBoolean()) {
originalRequest.setTimeout(null);
}

final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,34 +171,53 @@ public void handleException(TransportException exp) {
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

public void testFailsQuicklyWithZeroTimeout() throws InterruptedException {
public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();

final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(null);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}

final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
}

@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
countDownLatch.countDown();
}
});
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setWaitForNodes(2);
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}

@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
countDownLatch.countDown();
}
});

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
}

public void testGetsDiscoveredNodes() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,44 +146,6 @@ public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel
deterministicTaskQueue.runAllTasks();
}

public void testRetriesOnDiscoveryTimeout() {
AtomicLong callCount = new AtomicLong();
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 30000, () -> {
callCount.incrementAndGet();
try {
channel.sendResponse(new ElasticsearchTimeoutException("simulated timeout"));
} catch (IOException e) {
throw new AssertionError("unexpected", e);
}
}));

startServices();
while (callCount.get() < 5) {
if (deterministicTaskQueue.hasDeferredTasks()) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks();
}
}

public void testStopsRetryingDiscoveryWhenStopped() {
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 30000, () -> {
try {
channel.sendResponse(new ElasticsearchTimeoutException("simulated timeout"));
} catch (IOException e) {
throw new AssertionError("unexpected", e);
}
}));

scheduleStopAfter(150000);

startServices();
deterministicTaskQueue.runAllTasks();
// termination means success
}

public void testBootstrapsOnDiscoverySuccess() {
final AtomicBoolean discoveryAttempted = new AtomicBoolean();
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
Expand Down

0 comments on commit 9f7e951

Please sign in to comment.