Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Busy wait until all tasks are removed #95494

Merged
merged 3 commits into from
Apr 25, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -251,24 +250,22 @@ protected TestTasksResponse newResponse(

}

private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException {
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch) throws Exception {
return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request"));
}

private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request)
throws InterruptedException {
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws Exception {
PlainActionFuture<NodesResponse> future = newFuture();
startBlockingTestNodesAction(checkLatch, request, future);
return future;
}

private Task startBlockingTestNodesAction(CountDownLatch checkLatch, ActionListener<NodesResponse> listener)
throws InterruptedException {
private Task startBlockingTestNodesAction(CountDownLatch checkLatch, ActionListener<NodesResponse> listener) throws Exception {
return startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request"), listener);
}

private Task startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request, ActionListener<NodesResponse> listener)
throws InterruptedException {
throws Exception {
CountDownLatch actionLatch = new CountDownLatch(nodesCount);
TestNodesAction[] actions = new TestNodesAction[nodesCount];
for (int i = 0; i < testNodes.length; i++) {
Expand All @@ -295,7 +292,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {
}
// Make sure no tasks are running
for (TestNode node : testNodes) {
assertEquals(0, node.transportService.getTaskManager().getTasks().size());
assertBusy(() -> assertEquals(0, node.transportService.getTaskManager().getTasks().size()));
}
Task task = testNodes[0].transportService.getTaskManager()
.registerAndExecute("transport", actions[0], request, testNodes[0].transportService.getLocalNodeConnection(), listener);
Expand All @@ -305,7 +302,6 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {
return task;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95425")
public void testRunningTasksCount() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
Expand Down Expand Up @@ -408,7 +404,7 @@ public void onFailure(Exception e) {

// Make sure that we don't have any lingering tasks
for (TestNode node : testNodes) {
assertEquals(0, node.transportService.getTaskManager().getTasks().size());
assertBusy(() -> assertEquals(0, node.transportService.getTaskManager().getTasks().size()));
}
}

Expand Down Expand Up @@ -607,7 +603,7 @@ private List<String> getAllTaskDescriptions() {
return taskDescriptions;
}

public void testActionParentCancellationPropagates() throws ExecutionException, InterruptedException {
public void testActionParentCancellationPropagates() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -681,7 +677,7 @@ protected void taskOperation(
assertEquals(0, responses.failureCount());
}

public void testTaskLevelActionFailures() throws ExecutionException, InterruptedException {
public void testTaskLevelActionFailures() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -749,7 +745,7 @@ protected void taskOperation(
* it executes a tasks action that targets these blocked node actions. The test verifies that task actions are only
* getting executed on nodes that are not listed in the node filter.
*/
public void testTaskNodeFiltering() throws ExecutionException, InterruptedException {
public void testTaskNodeFiltering() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
Expand Down