diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index dadc69402d3f9..d4804b18bd160 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -86,6 +86,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import com.carrotsearch.randomizedtesting.annotations.Timeout; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.anyOf; @@ -1381,6 +1383,76 @@ public void testDeprecatedMasterServiceUpdateTaskThreadName() { assertThrows(AssertionError.class, () -> MasterService.assertClusterManagerUpdateThread()); } + @Timeout(millis = 5_000) + public void testTaskTimeout() throws InterruptedException { + try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) { + final AtomicInteger failureCount = new AtomicInteger(); + final AtomicInteger successCount = new AtomicInteger(); + final CountDownLatch taskStartLatch = new CountDownLatch(1); + final CountDownLatch blockingTaskLatch = new CountDownLatch(1); + final CountDownLatch timeoutLatch = new CountDownLatch(1); + final ClusterStateTaskListener blockingListener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + fail("Unexpected failure"); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + successCount.incrementAndGet(); + taskStartLatch.countDown(); + try { + blockingTaskLatch.await(); + } catch (InterruptedException e) { + fail("Interrupted"); + } + } + }; + final ClusterStateTaskListener timeoutListener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + assertEquals("timeout", source); + failureCount.incrementAndGet(); + timeoutLatch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + fail("Unexpected success"); + } + }; + + final ClusterStateTaskExecutor executor = (currentState, tasks) -> ClusterStateTaskExecutor.ClusterTasksResult.builder() + .successes(tasks) + .build(currentState); + + // start a task and wait for it to start and block on the clusterStateProcessed callback + clusterManagerService.submitStateUpdateTask( + "success", + new Object(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + blockingListener + ); + taskStartLatch.await(); + + // start a second task that is guaranteed to timeout as the first task is still running + clusterManagerService.submitStateUpdateTask( + "timeout", + new Object(), + ClusterStateTaskConfig.build(randomFrom(Priority.values()), TimeValue.timeValueMillis(1L)), + executor, + timeoutListener + ); + + // wait for the timeout to happen, then unblock and assert one success and one failure + timeoutLatch.await(); + blockingTaskLatch.countDown(); + assertEquals(1, failureCount.get()); + assertEquals(1, successCount.get()); + } + } + /** * Returns the cluster state that the cluster-manager service uses (and that is provided by the discovery layer) */