Skip to content

Commit

Permalink
Fix up tasks integ test
Browse files Browse the repository at this point in the history
I'd made some mistakes that hadn't caused the test to fail but did
slow it down and partially invalidate some of the assertions. This
fixes those mistakes.
  • Loading branch information
nik9000 committed Sep 16, 2016
1 parent 8ec94a4 commit 697adfb
Showing 1 changed file with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
Expand Down Expand Up @@ -333,12 +333,11 @@ public void testTransportBulkTasks() {
* particular status results from indexing. For that, look at {@link TransportReplicationActionTests}. We intentionally don't use the
* task recording mechanism used in other places in this test so we can make sure that the status fetching works properly over the wire.
*/
public void testCanFetchIndexStatus() throws InterruptedException, ExecutionException, IOException {
/* We make sure all indexing tasks wait to start before this lock is *unlocked* so we can fetch their status with both the get and
* list APIs. */
public void testCanFetchIndexStatus() throws Exception {
// First latch waits for the task to start, second on blocks it from finishing.
CountDownLatch taskRegistered = new CountDownLatch(1);
CountDownLatch letTaskFinish = new CountDownLatch(1);
ListenableActionFuture<IndexResponse> indexFuture = null;
Thread index = null;
try {
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
Expand All @@ -348,7 +347,7 @@ public void onTaskRegistered(Task task) {
taskRegistered.countDown();
logger.debug("Blocking [{}] starting", task);
try {
letTaskFinish.await(10, TimeUnit.SECONDS);
assertTrue(letTaskFinish.await(10, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -364,8 +363,13 @@ public void waitForTaskCompletion(Task task) {
}
});
}
indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
taskRegistered.await(10, TimeUnit.SECONDS); // waiting for at least one task to be registered
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
index = new Thread(() -> {
IndexResponse indexResponse = client().prepareIndex("test", "test").setSource("test", "test").get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
});
index.start();
assertTrue(taskRegistered.await(10, TimeUnit.SECONDS)); // waiting for at least one task to be registered

ListTasksResponse listResponse = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*")
.setDetailed(true).get();
Expand All @@ -387,10 +391,13 @@ public void waitForTaskCompletion(Task task) {
}
} finally {
letTaskFinish.countDown();
if (indexFuture != null) {
IndexResponse indexResponse = indexFuture.get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
if (index != null) {
index.join();
}
assertBusy(() -> {
assertEquals(emptyList(),
client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*").get().getTasks());
});
}
}

Expand Down Expand Up @@ -439,28 +446,35 @@ public void testListTasksWaitForCompletion() throws Exception {
}, response -> {
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getTasks(), hasSize(1));
TaskInfo task = response.getTasks().get(0);
assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
});
}

public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception {
waitForCompletionTestCase(false, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We didn't store the result so it won't come back when we wait
assertNull(response.getTask().getResponse());
// But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete.
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
}

public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
waitForCompletionTestCase(true, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertNotNull(response.getTask().getTask());
assertTrue(response.getTask().isCompleted());
// We stored the task so we should get its results
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
// The task's details should also be there
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
}

Expand Down Expand Up @@ -490,6 +504,7 @@ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId,
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
@Override
public void waitForTaskCompletion(Task task) {
waitForWaitingToStart.countDown();
}

@Override
Expand All @@ -498,15 +513,16 @@ public void onTaskRegistered(Task task) {

@Override
public void onTaskUnregistered(Task task) {
waitForWaitingToStart.countDown();
}
});
}

// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);

// Wait for the wait to start
/* Wait for the wait to start. This should count down just *before* we wait for completion but after the list/get has got a
* reference to the running task. Because we unblock immediately after this the task may no longer be running for us to wait
* on which is fine. */
waitForWaitingToStart.await();
} finally {
// Unblock the request so the wait for completion request can finish
Expand All @@ -517,7 +533,8 @@ public void onTaskUnregistered(Task task) {
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);

future.get();
TestTaskPlugin.NodesResponse response = future.get();
assertEquals(emptyList(), response.failures());
}

public void testListTasksWaitForTimeout() throws Exception {
Expand Down

0 comments on commit 697adfb

Please sign in to comment.