diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 48917ca84e89b..0910ac6506c9f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -706,6 +706,7 @@ private static class TaskAckListener { public void onCommit(TimeValue commitTime) { TimeValue ackTimeout = contextPreservingAckListener.ackTimeout(); if (ackTimeout == null) { + assert false : "ackTimeout must always be present: " + contextPreservingAckListener; ackTimeout = TimeValue.ZERO; } final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos())); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 6a24c8fc88078..57fb819ccd50e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -1336,7 +1336,7 @@ public void onFailure(Exception e) { } } - public void testAcking() throws InterruptedException { + public void testAcking() { final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node1").roles(emptySet()).build(); final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node2").roles(emptySet()).build(); final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node3").roles(emptySet()).build(); @@ -1344,12 +1344,15 @@ public void testAcking() throws InterruptedException { .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName()) .put(Node.NODE_NAME_SETTING.getKey(), "test_node") .build(); + final var deterministicTaskQueue = new DeterministicTaskQueue(); + final var threadPool = deterministicTaskQueue.getThreadPool(); + threadPool.getThreadContext().markAsSystemContext(); try ( - MasterService masterService = new MasterService( - settings, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + var masterService = createMasterService( + true, + new TaskManager(settings, threadPool, emptySet()), threadPool, - new TaskManager(settings, threadPool, emptySet()) + new StoppableExecutorServiceWrapper(threadPool.generic()) ) ) { @@ -1365,7 +1368,6 @@ public void testAcking() throws InterruptedException { publisherRef.get().publish(e, pl, al); }); masterService.setClusterStateSupplier(() -> initialClusterState); - masterService.start(); class LatchAckListener implements ClusterStateAckListener { private final CountDownLatch latch; @@ -1442,7 +1444,8 @@ public void onFailure(Exception e) { : ClusterState.builder(batchExecutionContext.initialState()).build(); }).submitTask("success-test", new Task(), null); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } // check that we complete a dynamic ack listener supplied by the task @@ -1473,7 +1476,8 @@ public void onFailure(Exception e) { : ClusterState.builder(batchExecutionContext.initialState()).build(); }).submitTask("success-test", new Task(), null); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } // check that we supply a no-op publish listener if we only care about acking @@ -1504,7 +1508,8 @@ public void onFailure(Exception e) { : ClusterState.builder(batchExecutionContext.initialState()).build(); }).submitTask("success-test", new Task(), null); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } // check that exception from acking is passed to listener @@ -1553,7 +1558,8 @@ public void onAckFailure(Exception e) { return ClusterState.builder(batchExecutionContext.initialState()).build(); }).submitTask("node-ack-fail-test", new Task(), null); - assertTrue(latch.await(10, TimeUnit.SECONDS)); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } // check that we don't time out before even committing the cluster state @@ -1587,6 +1593,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { @Override public void onFailure(Exception e) { + assertEquals("mock exception", asInstanceOf(FailedToCommitClusterStateException.class, e).getMessage()); latch.countDown(); } @@ -1597,14 +1604,15 @@ public void onAckTimeout() { } ); - latch.await(); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } // check that we timeout if commit took too long { final CountDownLatch latch = new CountDownLatch(2); - final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100)); + final TimeValue ackTimeout = TimeValue.timeValueMillis(scaledRandomIntBetween(0, 100000)); publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> { publishListener.onResponse(null); @@ -1652,7 +1660,8 @@ public void onAckTimeout() { } ); - latch.await(); + deterministicTaskQueue.runAllTasksInTimeOrder(); + safeAwait(latch); } } }