Skip to content

Commit

Permalink
Make MasterServiceTests#testAcking deterministic
Browse files Browse the repository at this point in the history
No need to use a real concurrent `ThreadPool` here, we can cover
everything we need with a fake threadpool, making the test deterministic
(and faster).

Relates elastic#107044
  • Loading branch information
DaveCTurner committed Apr 19, 2024
1 parent f340008 commit db3c463
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ private static class TaskAckListener {
public void onCommit(TimeValue commitTime) {
TimeValue ackTimeout = contextPreservingAckListener.ackTimeout();
if (ackTimeout == null) {
assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
ackTimeout = TimeValue.ZERO;
}
final TimeValue timeLeft = TimeValue.timeValueNanos(Math.max(0, ackTimeout.nanos() - commitTime.nanos()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,20 +1336,23 @@ public void onFailure(Exception e) {
}
}

public void testAcking() throws InterruptedException {
public void testAcking() {
final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node1").roles(emptySet()).build();
final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node2").roles(emptySet()).build();
final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node3").roles(emptySet()).build();
final Settings settings = Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build();
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();
threadPool.getThreadContext().markAsSystemContext();
try (
MasterService masterService = new MasterService(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
var masterService = createMasterService(
true,
new TaskManager(settings, threadPool, emptySet()),
threadPool,
new TaskManager(settings, threadPool, emptySet())
new StoppableExecutorServiceWrapper(threadPool.generic())
)
) {

Expand All @@ -1365,7 +1368,6 @@ public void testAcking() throws InterruptedException {
publisherRef.get().publish(e, pl, al);
});
masterService.setClusterStateSupplier(() -> initialClusterState);
masterService.start();

class LatchAckListener implements ClusterStateAckListener {
private final CountDownLatch latch;
Expand Down Expand Up @@ -1442,7 +1444,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we complete a dynamic ack listener supplied by the task
Expand Down Expand Up @@ -1473,7 +1476,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we supply a no-op publish listener if we only care about acking
Expand Down Expand Up @@ -1504,7 +1508,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that exception from acking is passed to listener
Expand Down Expand Up @@ -1553,7 +1558,8 @@ public void onAckFailure(Exception e) {
return ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("node-ack-fail-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we don't time out before even committing the cluster state
Expand Down Expand Up @@ -1587,6 +1593,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public void onFailure(Exception e) {
assertEquals("mock exception", asInstanceOf(FailedToCommitClusterStateException.class, e).getMessage());
latch.countDown();
}

Expand All @@ -1597,14 +1604,15 @@ public void onAckTimeout() {
}
);

latch.await();
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we timeout if commit took too long
{
final CountDownLatch latch = new CountDownLatch(2);

final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
final TimeValue ackTimeout = TimeValue.timeValueMillis(scaledRandomIntBetween(0, 100000));

publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onResponse(null);
Expand Down Expand Up @@ -1652,7 +1660,8 @@ public void onAckTimeout() {
}
);

latch.await();
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}
}
}
Expand Down

0 comments on commit db3c463

Please sign in to comment.