Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MasterServiceTests#testAcking deterministic #107651

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1336,20 +1336,23 @@ public void onFailure(Exception e) {
}
}

public void testAcking() throws InterruptedException {
public void testAcking() {
final DiscoveryNode node1 = DiscoveryNodeUtils.builder("node1").roles(emptySet()).build();
final DiscoveryNode node2 = DiscoveryNodeUtils.builder("node2").roles(emptySet()).build();
final DiscoveryNode node3 = DiscoveryNodeUtils.builder("node3").roles(emptySet()).build();
final Settings settings = Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build();
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();
threadPool.getThreadContext().markAsSystemContext();
try (
MasterService masterService = new MasterService(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
var masterService = createMasterService(
true,
new TaskManager(settings, threadPool, emptySet()),
threadPool,
new TaskManager(settings, threadPool, emptySet())
new StoppableExecutorServiceWrapper(threadPool.generic())
)
) {

Expand All @@ -1365,7 +1368,6 @@ public void testAcking() throws InterruptedException {
publisherRef.get().publish(e, pl, al);
});
masterService.setClusterStateSupplier(() -> initialClusterState);
masterService.start();

class LatchAckListener implements ClusterStateAckListener {
private final CountDownLatch latch;
Expand Down Expand Up @@ -1442,7 +1444,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we complete a dynamic ack listener supplied by the task
Expand Down Expand Up @@ -1473,7 +1476,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we supply a no-op publish listener if we only care about acking
Expand Down Expand Up @@ -1504,7 +1508,8 @@ public void onFailure(Exception e) {
: ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("success-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that exception from acking is passed to listener
Expand Down Expand Up @@ -1553,7 +1558,8 @@ public void onAckFailure(Exception e) {
return ClusterState.builder(batchExecutionContext.initialState()).build();
}).submitTask("node-ack-fail-test", new Task(), null);

assertTrue(latch.await(10, TimeUnit.SECONDS));
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we don't time out before even committing the cluster state
Expand Down Expand Up @@ -1587,6 +1593,7 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {

@Override
public void onFailure(Exception e) {
assertEquals("mock exception", asInstanceOf(FailedToCommitClusterStateException.class, e).getMessage());
latch.countDown();
}

Expand All @@ -1597,14 +1604,15 @@ public void onAckTimeout() {
}
);

latch.await();
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}

// check that we timeout if commit took too long
{
final CountDownLatch latch = new CountDownLatch(2);

final TimeValue ackTimeout = TimeValue.timeValueMillis(randomInt(100));
final TimeValue ackTimeout = TimeValue.timeValueMillis(scaledRandomIntBetween(0, 100000));

publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> {
publishListener.onResponse(null);
Expand Down Expand Up @@ -1652,7 +1660,8 @@ public void onAckTimeout() {
}
);

latch.await();
deterministicTaskQueue.runAllTasksInTimeOrder();
safeAwait(latch);
}
}
}
Expand Down