Skip to content

Commit

Permalink
Add a deterministic test case for timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross committed Jul 27, 2023
1 parent 2b1208c commit 8c5f7e0
Showing 1 changed file with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.carrotsearch.randomizedtesting.annotations.Timeout;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.anyOf;
Expand Down Expand Up @@ -1381,6 +1383,76 @@ public void testDeprecatedMasterServiceUpdateTaskThreadName() {
assertThrows(AssertionError.class, () -> MasterService.assertClusterManagerUpdateThread());
}

@Timeout(millis = 5_000)
public void testTaskTimeout() throws InterruptedException {
try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) {
final AtomicInteger failureCount = new AtomicInteger();
final AtomicInteger successCount = new AtomicInteger();
final CountDownLatch taskStartLatch = new CountDownLatch(1);
final CountDownLatch blockingTaskLatch = new CountDownLatch(1);
final CountDownLatch timeoutLatch = new CountDownLatch(1);
final ClusterStateTaskListener blockingListener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
fail("Unexpected failure");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
successCount.incrementAndGet();
taskStartLatch.countDown();
try {
blockingTaskLatch.await();
} catch (InterruptedException e) {
fail("Interrupted");
}
}
};
final ClusterStateTaskListener timeoutListener = new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
assertEquals("timeout", source);
failureCount.incrementAndGet();
timeoutLatch.countDown();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail("Unexpected success");
}
};

final ClusterStateTaskExecutor<Object> executor = (currentState, tasks) -> ClusterStateTaskExecutor.ClusterTasksResult.builder()
.successes(tasks)
.build(currentState);

// start a task and wait for it to start and block on the clusterStateProcessed callback
clusterManagerService.submitStateUpdateTask(
"success",
new Object(),
ClusterStateTaskConfig.build(randomFrom(Priority.values())),
executor,
blockingListener
);
taskStartLatch.await();

// start a second task that is guaranteed to timeout as the first task is still running
clusterManagerService.submitStateUpdateTask(
"timeout",
new Object(),
ClusterStateTaskConfig.build(randomFrom(Priority.values()), TimeValue.timeValueMillis(1L)),
executor,
timeoutListener
);

// wait for the timeout to happen, then unblock and assert one success and one failure
timeoutLatch.await();
blockingTaskLatch.countDown();
assertEquals(1, failureCount.get());
assertEquals(1, successCount.get());
}
}

/**
* Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer)
*/
Expand Down

0 comments on commit 8c5f7e0

Please sign in to comment.