Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Remove plain execute method on TransportAction #30998

Merged
merged 3 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,6 @@ protected TransportAction(Settings settings, String actionName, ThreadPool threa
this.taskManager = taskManager;
}

public final ActionFuture<Response> execute(Request request) {
PlainActionFuture<Response> future = newFuture();
execute(request, future);
return future;
}

/**
* Use this method when the transport action call should result in creation of a new task associated with the call.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
Expand Down Expand Up @@ -65,7 +66,9 @@ public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPo
}

public ActionFuture<NodesGatewayMetaState> list(String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout));
PlainActionFuture<NodesGatewayMetaState> future = PlainActionFuture.newFuture();
execute(new Request(nodesIds).timeout(timeout), future);
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
Expand Down Expand Up @@ -254,8 +255,8 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Awaiting for the main task to finish
responseLatch.await();
Expand Down Expand Up @@ -287,9 +288,9 @@ public void onFailure(Exception e) {
}

// Make sure that tasks are no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get();
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size());

// Make sure that there are no leftover bans, the ban removal is async, so we might return from the cancellation
Expand Down Expand Up @@ -326,8 +327,8 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setParentTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction, request);

// Awaiting for the main task to finish
responseLatch.await();
Expand All @@ -336,16 +337,11 @@ public void onFailure(Exception e) {
assertThat(response.getTasks().size(), equalTo(testNodes.length));

assertBusy(() -> {
try {
// Make sure that main task is no longer running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
new TaskId(testNodes[0].getNodeId(), mainTask.getId()))).get();
assertEquals(0, listTasksResponse.getTasks().size());

} catch (ExecutionException | InterruptedException ex) {
throw new RuntimeException(ex);
}
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId())));
assertEquals(0, listTasksResponse.getTasks().size());
});
}

Expand Down Expand Up @@ -378,8 +374,9 @@ public void onFailure(Exception e) {
String mainNode = testNodes[0].getNodeId();

// Make sure that tasks are running
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId()))).get();
ListTasksResponse listTasksResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setParentTaskId(new TaskId(mainNode, mainTask.getId())));
assertThat(listTasksResponse.getTasks().size(), greaterThanOrEqualTo(blockOnNodes.size()));

// Simulate the coordinating node leaving the cluster
Expand All @@ -400,7 +397,7 @@ public void onFailure(Exception e) {
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), mainTask.getId()));
// And send the cancellation request to a random node
CancelTasksResponse response = testNodes[0].transportCancelTasksAction.execute(request).get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportCancelTasksAction, request);
logger.info("--> Done simulating issuing cancel request on the node that is about to leave the cluster");
// This node still thinks that's part of the cluster, so cancelling should look successful
if (response.getTasks().size() == 0) {
Expand All @@ -420,15 +417,10 @@ public void onFailure(Exception e) {

assertBusy(() -> {
// Make sure that tasks are no longer running
try {
ListTasksResponse listTasksResponse1 = testNodes[randomIntBetween(1, testNodes.length - 1)]
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId()))).get();
assertEquals(0, listTasksResponse1.getTasks().size());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (ExecutionException ex2) {
fail("shouldn't be here");
}
ListTasksResponse listTasksResponse1 = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(1, testNodes.length - 1)].transportListTasksAction,
new ListTasksRequest().setTaskId(new TaskId(mainNode, mainTask.getId())));
assertEquals(0, listTasksResponse1.getTasks().size());
});

// Wait for clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.admin.cluster.node.tasks;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
Expand Down Expand Up @@ -363,7 +365,7 @@ public void onFailure(Exception e) {
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*"); // pick all test actions
logger.info("Listing currently running tasks using node [{}]", testNodeNum);
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
logger.info("Checking currently running tasks");
assertEquals(testNodes.length, response.getPerNodeTasks().size());

Expand All @@ -382,7 +384,7 @@ public void onFailure(Exception e) {
testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -396,7 +398,7 @@ public void onFailure(Exception e) {

// Check task counts using transport with detailed description
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -405,7 +407,7 @@ public void onFailure(Exception e) {

// Make sure that the main task on coordinating node is the task that was returned to us by execute()
listTasksRequest.setActions("testAction"); // only pick the main task
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size());
assertEquals(mainTask.getId(), response.getTasks().get(0).getId());

Expand Down Expand Up @@ -433,15 +435,15 @@ public void testFindChildTasks() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(1, response.getTasks().size());
String parentNode = response.getTasks().get(0).getTaskId().getNodeId();
long parentTaskId = response.getTasks().get(0).getId();

// Find tasks with common parent
listTasksRequest = new ListTasksRequest();
listTasksRequest.setParentTaskId(new TaskId(parentNode, parentTaskId));
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getTasks().size());
for (TaskInfo task : response.getTasks()) {
assertEquals("testAction[n]", task.getAction());
Expand All @@ -467,7 +469,7 @@ public void testTaskManagementOptOut() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*");
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size());

// Release all tasks and wait for response
Expand All @@ -488,7 +490,7 @@ public void testTasksDescriptions() throws Exception {
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction[n]"); // only pick node actions
ListTasksResponse response = testNode.transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand All @@ -498,7 +500,7 @@ public void testTasksDescriptions() throws Exception {
// Check task counts using transport with detailed description
long minimalDurationNanos = System.nanoTime() - maximumStartTimeNanos;
listTasksRequest.setDetailed(true); // same request only with detailed description
response = testNode.transportListTasksAction.execute(listTasksRequest).get();
response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
Expand Down Expand Up @@ -536,8 +538,8 @@ public void onFailure(Exception e) {
request.setNodes(testNodes[0].getNodeId());
request.setReason("Testing Cancellation");
request.setActions(actionName);
CancelTasksResponse response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request)
.get();
CancelTasksResponse response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
Expand All @@ -549,7 +551,8 @@ public void onFailure(Exception e) {
request = new CancelTasksRequest();
request.setReason("Testing Cancellation");
request.setTaskId(new TaskId(testNodes[0].getNodeId(), task.getId()));
response = testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction.execute(request).get();
response = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportCancelTasksAction, request);

// Shouldn't match any tasks since testAction doesn't support cancellation
assertEquals(0, response.getTasks().size());
Expand All @@ -560,8 +563,8 @@ public void onFailure(Exception e) {
// Make sure that task is still running
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(actionName);
ListTasksResponse listResponse = testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction.execute
(listTasksRequest).get();
ListTasksResponse listResponse = ActionTestUtils.executeBlocking(
testNodes[randomIntBetween(0, testNodes.length - 1)].transportListTasksAction, listTasksRequest);
assertEquals(1, listResponse.getPerNodeTasks().size());
// Verify that tasks are marked as non-cancellable
for (TaskInfo taskInfo : listResponse.getTasks()) {
Expand Down Expand Up @@ -595,7 +598,7 @@ protected NodeResponse nodeOperation(NodeRequest request) {
assertEquals(0, testNode.transportService.getTaskManager().getTasks().size());
}
NodesRequest request = new NodesRequest("Test Request");
NodesResponse responses = actions[0].execute(request).get();
NodesResponse responses = ActionTestUtils.executeBlocking(actions[0], request);
assertEquals(nodesCount, responses.failureCount());

// Make sure that actions are still registered in the task manager on all nodes
Expand Down Expand Up @@ -660,7 +663,7 @@ protected void taskOperation(TestTasksRequest request, Task task, ActionListener
// should be successful on all nodes except one
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[0].execute(testTasksRequest).get();
TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[0], testTasksRequest);
assertThat(response.getTaskFailures(), hasSize(1)); // one task failed
assertThat(response.getTaskFailures().get(0).getReason(), containsString("Task level failure"));
// Get successful responses from all nodes except one
Expand Down Expand Up @@ -730,7 +733,7 @@ protected void taskOperation(TestTasksRequest request, Task task, ActionListener
// should be successful on all nodes except nodes that we filtered out
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("testAction[n]"); // pick all test actions
TestTasksResponse response = tasksActions[randomIntBetween(0, nodesCount - 1)].execute(testTasksRequest).get();
TestTasksResponse response = ActionTestUtils.executeBlocking(tasksActions[randomIntBetween(0, nodesCount - 1)], testTasksRequest);

// Get successful responses from all nodes except nodes that we filtered out
assertEquals(testNodes.length - filterNodes.size(), response.tasks.size());
Expand All @@ -757,7 +760,7 @@ public void testTasksToXContentGrouping() throws Exception {
// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions(ListTasksAction.NAME + "*");
ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get();
ListTasksResponse response = ActionTestUtils.executeBlocking(testNodes[0].transportListTasksAction, listTasksRequest);
assertEquals(testNodes.length + 1, response.getTasks().size());

Map<String, Object> byNodes = serialize(response, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -141,7 +142,7 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
multiSearchRequest.add(new SearchRequest());
}

MultiSearchResponse response = action.execute(multiSearchRequest).actionGet();
MultiSearchResponse response = ActionTestUtils.executeBlocking(action, multiSearchRequest);
assertThat(response.getResponses().length, equalTo(numSearchRequests));
assertThat(requests.size(), equalTo(numSearchRequests));
assertThat(errorHolder.get(), nullValue());
Expand Down
Loading