diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 8c81cb54f7901..ab89a3e8b4fc2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -612,16 +612,10 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - // generating description in a lazy way since source can be quite big - return new SearchTask(id, type, action, null, parentTaskId, headers) { - @Override - public String getDescription() { - return buildDescription(); - } - }; + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers); } - public String buildDescription() { + public final String buildDescription() { StringBuilder sb = new StringBuilder(); sb.append("indices["); Strings.arrayToDelimitedString(indices, ",", sb); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index 5815df7cb96fa..7a320d81f2d01 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -114,7 +114,7 @@ public SearchScrollRequest scroll(String keepAlive) { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java index c5a918c06f1bb..45f1fe95860a5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTask.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -23,15 +23,25 @@ import org.elasticsearch.tasks.TaskId; import java.util.Map; +import java.util.function.Supplier; /** * Task storing information about a currently running {@link SearchRequest}. */ public class SearchTask extends CancellableTask { + // generating description in a lazy way since source can be quite big + private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; - public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { - super(id, type, action, description, parentTaskId, headers); + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, + TaskId parentTaskId, Map headers) { + super(id, type, action, null, parentTaskId, headers); + this.descriptionSupplier = descriptionSupplier; + } + + @Override + public final String getDescription() { + return descriptionSupplier.get(); } /** diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 5b145994dd738..cf1a96dde422c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -77,7 +77,7 @@ public Logger getLogger() { @Override public SearchTask getTask() { - return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); + return new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 90555c819bca0..cbdc5b56c85b8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -146,7 +146,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest searchRequest.allowPartialSearchResults(false); SearchPhaseController controller = new SearchPhaseController( writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder()); - SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap()); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 3e846d3541a5a..6eea2c21afdf1 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -84,6 +84,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { String type, String action, TaskId parentTaskId, + Supplier descriptionSupplier, TimeValue keepAlive, Map originHeaders, Map taskHeaders, @@ -91,7 +92,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { Client client, ThreadPool threadPool, Supplier aggReduceContextSupplier) { - super(id, type, action, "async_search", parentTaskId, taskHeaders); + super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders); this.expirationTimeMillis = getStartTime() + keepAlive.getMillis(); this.originHeaders = originHeaders; this.searchId = searchId; diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index f9e60c3b2acd1..7965774f4869e 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -143,7 +143,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id)); Supplier aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); - return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive, + return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier); } }; diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java index 0bf3338d823a7..53f6850e183c4 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchShard; import org.elasticsearch.action.search.ShardSearchFailure; @@ -18,6 +19,7 @@ import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHits; @@ -26,6 +28,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -71,13 +74,23 @@ public void afterTest() { } private AsyncSearchTask createAsyncSearchTask() { - return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), + return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); } + public void testTaskDescription() { + SearchRequest searchRequest = new SearchRequest("index1", "index2").source( + new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value"))); + AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription, + TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), + new NoOpClient(threadPool), threadPool, null); + assertEquals("async_search{indices[index1,index2], search_type[QUERY_THEN_FETCH], " + + "source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription()); + } + public void testWaitForInit() throws InterruptedException { - AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1), + AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)), new NoOpClient(threadPool), threadPool, null); int numShards = randomIntBetween(0, 10); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeRequest.java index 89094b5f1a8c9..fd75f48a5ea74 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeRequest.java @@ -104,13 +104,13 @@ public String preference() { return preference; } + @Override + public String getDescription() { + return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]"; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, null, parentTaskId, headers) { - @Override - public String getDescription() { - return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]"; - } - }; + return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers); } }