Skip to content

Commit

Permalink
Print out search request as part of async search task description (#6…
Browse files Browse the repository at this point in the history
…2057)

Currently, the async search task is the task that will be running through the whole execution of an async search. While the submit async search task prints out the search as part of its description, async search task doesn't while it should.

With this commit we address that while also making sure that the description highlights that the task is originated from an async search.

Also, we streamline the way the description is printed out by SearchTask so that it does not get forgotten in the future.
  • Loading branch information
javanna authored Sep 8, 2020
1 parent 1b51acb commit 06c85d3
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,16 +612,10 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
return buildDescription();
}
};
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers);
}

public String buildDescription() {
public final String buildDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public SearchScrollRequest scroll(String keepAlive) {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@
import org.elasticsearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Supplier;

/**
* Task storing information about a currently running {@link SearchRequest}.
*/
public class SearchTask extends CancellableTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;

public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, null, parentTaskId, headers);
this.descriptionSupplier = descriptionSupplier;
}

@Override
public final String getDescription() {
return descriptionSupplier.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Logger getLogger() {

@Override
public SearchTask getTask() {
return new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
return new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest
searchRequest.allowPartialSearchResults(false);
SearchPhaseController controller = new SearchPhaseController(
writableRegistry(), r -> InternalAggregationTestCase.emptyReduceContextBuilder());
SearchTask task = new SearchTask(0, "n/a", "n/a", "test", null, Collections.emptyMap());
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger,
searchTransportService, (clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
String type,
String action,
TaskId parentTaskId,
Supplier<String> descriptionSupplier,
TimeValue keepAlive,
Map<String, String> originHeaders,
Map<String, String> taskHeaders,
AsyncExecutionId searchId,
Client client,
ThreadPool threadPool,
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
super(id, type, action, "async_search", parentTaskId, taskHeaders);
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
this.originHeaders = originHeaders;
this.searchId = searchId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive,
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchShard;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHits;
Expand All @@ -26,6 +28,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -71,13 +74,23 @@ public void afterTest() {
}

private AsyncSearchTask createAsyncSearchTask() {
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
}

public void testTaskDescription() {
SearchRequest searchRequest = new SearchRequest("index1", "index2").source(
new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));
AsyncSearchTask asyncSearchTask = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), searchRequest::buildDescription,
TimeValue.timeValueHours(1), Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
assertEquals("async_search{indices[index1,index2], search_type[QUERY_THEN_FETCH], " +
"source[{\"query\":{\"term\":{\"field\":{\"value\":\"value\",\"boost\":1.0}}}}]}", asyncSearchTask.getDescription());
}

public void testWaitForInit() throws InterruptedException {
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> null, TimeValue.timeValueHours(1),
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
new NoOpClient(threadPool), threadPool, null);
int numShards = randomIntBetween(0, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ public String preference() {
return preference;
}

@Override
public String getDescription() {
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";
}
};
return new SearchTask(id, type, action, this::getDescription, parentTaskId, headers);
}
}

0 comments on commit 06c85d3

Please sign in to comment.