From bcfa12d1b2d253eddf3d900c7cf65e1198a33fe8 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 11 Oct 2023 17:20:34 +1100 Subject: [PATCH 1/2] Run tasks in the determinsitic task queue upto a cutoff time Relates: https://github.com/elastic/elasticsearch/pull/99994#discussion_r1351706777 --- .../gateway/GatewayServiceTests.java | 9 ++--- .../concurrent/DeterministicTaskQueue.java | 11 ++++++ .../DeterministicTaskQueueTests.java | 35 +++++++++++++++++++ 3 files changed, 49 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index b4296ae684840..4f4f28c29d92f 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -304,8 +304,7 @@ public void testScheduledRecoveryNoOpWhenNewTermBegins() { elapsed.millis(), () -> setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null) ); - deterministicTaskQueue.advanceTime(); - deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis()); assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfTerm1))); // The 1st scheduled recovery is now a no-op deterministicTaskQueue.advanceTime(); @@ -388,16 +387,14 @@ public void testScheduledRecoveryWithRecoverAfterNodes() { elapsed.millis(), () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null) ); - deterministicTaskQueue.advanceTime(); - deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis()); // The 2nd scheduled recovery when data nodes are above recoverAfterDataNodes again deterministicTaskQueue.scheduleAt( elapsed.millis() * 2, () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null) ); - deterministicTaskQueue.advanceTime(); - deterministicTaskQueue.runAllRunnableTasks(); + deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis() * 2); assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm)); assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsed.millis() * 2)); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java index 81a419508dbee..6790d16b48492 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java @@ -101,6 +101,17 @@ public void runAllTasksInTimeOrder() { } } + /** + * Run all {@code runnableTasks} and {@code deferredTasks} that are scheduled to run before or on the given {@code timeInMillis}. + */ + public void runTasksUpToTimeInOrder(long timeInMillis) { + runAllRunnableTasks(); + while (nextDeferredTaskExecutionTimeMillis <= timeInMillis) { + advanceTime(); + runAllRunnableTasks(); + } + } + /** * @return whether there are any runnable tasks. */ diff --git a/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java index dc51a01da844f..dd6adf60f14f9 100644 --- a/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java @@ -14,10 +14,14 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -229,6 +233,37 @@ public void testRunInTimeOrder() { assertThat(strings, contains("foo", "bar")); } + public void testRunTasksUpToTimeInOrder() { + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + // The queue does _not_ have to be a clean slate before test + if (randomBoolean()) { + taskQueue.scheduleAt(randomLongBetween(1, 100), () -> {}); + taskQueue.runAllTasksInTimeOrder(); + } + + final long cutoffTimeInMillis = randomLongBetween(taskQueue.getCurrentTimeMillis(), 1000); + final Set seenNumbers = new HashSet<>(); + + final int nRunnableTasks = randomIntBetween(0, 10); + IntStream.range(0, nRunnableTasks).forEach(i -> taskQueue.scheduleNow(() -> seenNumbers.add(i))); + + final int nDeferredTasksUptoCutoff = randomIntBetween(0, 10); + IntStream.range(0, nDeferredTasksUptoCutoff) + .forEach(i -> taskQueue.scheduleAt(randomLongBetween(0, cutoffTimeInMillis), () -> seenNumbers.add(i + nRunnableTasks))); + + IntStream.range(0, randomIntBetween(0, 10)) + .forEach( + i -> taskQueue.scheduleAt( + randomLongBetween(cutoffTimeInMillis + 1, 2 * cutoffTimeInMillis), + () -> seenNumbers.add(i + nRunnableTasks + nDeferredTasksUptoCutoff) + ) + ); + + taskQueue.runTasksUpToTimeInOrder(cutoffTimeInMillis); + assertThat(seenNumbers, equalTo(IntStream.range(0, nRunnableTasks + nDeferredTasksUptoCutoff).boxed().collect(Collectors.toSet()))); + assertThat(taskQueue.getCurrentTimeMillis(), lessThanOrEqualTo(cutoffTimeInMillis)); + } + public void testThreadPoolEnqueuesTasks() { final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); final List strings = new ArrayList<>(2); From c7f85f14ede2700d061c7e09beaa72553c867214 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 11 Oct 2023 20:52:06 +1100 Subject: [PATCH 2/2] address feedback and add new method --- .../gateway/GatewayServiceTests.java | 9 +++---- .../concurrent/DeterministicTaskQueue.java | 10 +++++++ .../DeterministicTaskQueueTests.java | 26 +++++++++++++++---- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index 4f4f28c29d92f..2136b154480ff 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -300,11 +300,10 @@ public void testScheduledRecoveryNoOpWhenNewTermBegins() { // The 1st schedule is effectively cancelled if a new term begins final TimeValue elapsed = TimeValue.timeValueMinutes(1); final ClusterState clusterStateOfTerm2 = incrementTerm(clusterService.state()); - deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.scheduleAtAndRunUpTo( elapsed.millis(), () -> setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null) ); - deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis()); assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfTerm1))); // The 1st scheduled recovery is now a no-op deterministicTaskQueue.advanceTime(); @@ -383,18 +382,16 @@ public void testScheduledRecoveryWithRecoverAfterNodes() { // The 1st schedule is cancelled when data nodes drop below recoverAfterDataNodes final TimeValue elapsed = TimeValue.timeValueMinutes(1); - deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.scheduleAtAndRunUpTo( elapsed.millis(), () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null) ); - deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis()); // The 2nd scheduled recovery when data nodes are above recoverAfterDataNodes again - deterministicTaskQueue.scheduleAt( + deterministicTaskQueue.scheduleAtAndRunUpTo( elapsed.millis() * 2, () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null) ); - deterministicTaskQueue.runTasksUpToTimeInOrder(elapsed.millis() * 2); assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm)); assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsed.millis() * 2)); diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java index 6790d16b48492..e89a6c8a84bf7 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java @@ -103,6 +103,7 @@ public void runAllTasksInTimeOrder() { /** * Run all {@code runnableTasks} and {@code deferredTasks} that are scheduled to run before or on the given {@code timeInMillis}. + * The current time will be set to {@code timeInMillis} once the method returns. */ public void runTasksUpToTimeInOrder(long timeInMillis) { runAllRunnableTasks(); @@ -110,6 +111,7 @@ public void runTasksUpToTimeInOrder(long timeInMillis) { advanceTime(); runAllRunnableTasks(); } + currentTimeMillis = timeInMillis; } /** @@ -185,6 +187,14 @@ public void scheduleAt(final long executionTimeMillis, final Runnable task) { } } + /** + * Similar to {@link #scheduleAt} but also advance time to {@code executionTimeMillis} and run all eligible tasks. + */ + public void scheduleAtAndRunUpTo(final long executionTimeMillis, final Runnable task) { + scheduleAt(executionTimeMillis, task); + runTasksUpToTimeInOrder(executionTimeMillis); + } + private void scheduleDeferredTask(DeferredTask deferredTask) { nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.executionTimeMillis()); latestDeferredExecutionTime = Math.max(latestDeferredExecutionTime, deferredTask.executionTimeMillis()); diff --git a/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java index dd6adf60f14f9..2f920f3e58fa1 100644 --- a/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java +++ b/test/framework/src/test/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueueTests.java @@ -247,21 +247,37 @@ public void testRunTasksUpToTimeInOrder() { final int nRunnableTasks = randomIntBetween(0, 10); IntStream.range(0, nRunnableTasks).forEach(i -> taskQueue.scheduleNow(() -> seenNumbers.add(i))); - final int nDeferredTasksUptoCutoff = randomIntBetween(0, 10); - IntStream.range(0, nDeferredTasksUptoCutoff) + final int nDeferredTasksUpToCutoff = randomIntBetween(0, 10); + IntStream.range(0, nDeferredTasksUpToCutoff) .forEach(i -> taskQueue.scheduleAt(randomLongBetween(0, cutoffTimeInMillis), () -> seenNumbers.add(i + nRunnableTasks))); IntStream.range(0, randomIntBetween(0, 10)) .forEach( i -> taskQueue.scheduleAt( randomLongBetween(cutoffTimeInMillis + 1, 2 * cutoffTimeInMillis), - () -> seenNumbers.add(i + nRunnableTasks + nDeferredTasksUptoCutoff) + () -> seenNumbers.add(i + nRunnableTasks + nDeferredTasksUpToCutoff) ) ); taskQueue.runTasksUpToTimeInOrder(cutoffTimeInMillis); - assertThat(seenNumbers, equalTo(IntStream.range(0, nRunnableTasks + nDeferredTasksUptoCutoff).boxed().collect(Collectors.toSet()))); - assertThat(taskQueue.getCurrentTimeMillis(), lessThanOrEqualTo(cutoffTimeInMillis)); + assertThat(seenNumbers, equalTo(IntStream.range(0, nRunnableTasks + nDeferredTasksUpToCutoff).boxed().collect(Collectors.toSet()))); + assertThat(taskQueue.getCurrentTimeMillis(), equalTo(cutoffTimeInMillis)); + } + + public void testScheduleAtAndRunUpTo() { + final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); + final Set seenNumbers = new HashSet<>(); + + final int nRunnableTasks = randomIntBetween(0, 10); + IntStream.range(0, nRunnableTasks).forEach(i -> taskQueue.scheduleNow(() -> seenNumbers.add(i))); + taskQueue.scheduleAt(500, () -> seenNumbers.add(500)); + taskQueue.scheduleAt(800, () -> seenNumbers.add(800)); + + final long executionTimeMillis = randomLongBetween(1, 400); + taskQueue.scheduleAtAndRunUpTo(executionTimeMillis, () -> seenNumbers.add(nRunnableTasks)); + + assertThat(seenNumbers, equalTo(IntStream.rangeClosed(0, nRunnableTasks).boxed().collect(Collectors.toSet()))); + assertThat(taskQueue.getCurrentTimeMillis(), equalTo(executionTimeMillis)); } public void testThreadPoolEnqueuesTasks() {