Skip to content

Commit

Permalink
Cancel multisearch when http connection closed (#61399)
Browse files Browse the repository at this point in the history
Relates #61337
  • Loading branch information
dnhatn authored Aug 21, 2020
1 parent 2e4394c commit 066e83c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@
*/
package org.elasticsearch.http;

import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.MockScriptPlugin;
Expand All @@ -45,6 +51,7 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportService;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -75,15 +82,29 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testAutomaticCancellationDuringQueryPhase() throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();

List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

Request searchRequest = new Request("GET", "/test/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery(
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())));
searchRequest.setJsonEntity(Strings.toString(searchSource));
verifyCancellationDuringQueryPhase(SearchAction.NAME, searchRequest);
}

public void testAutomaticCancellationMultiSearchDuringQueryPhase() throws Exception {
XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
.source(new SearchSourceBuilder().scriptField("test_field",
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
Request restRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
verifyCancellationDuringQueryPhase(MultiSearchAction.NAME, restRequest);
}

void verifyCancellationDuringQueryPhase(String searchAction, Request searchRequest) throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();

List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
Expand All @@ -102,23 +123,37 @@ public void onFailure(Exception exception) {

awaitForBlock(plugins);
cancellable.cancel();
ensureSearchTaskIsCancelled(nodeIdToName::get);
ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);

disableBlocks(plugins);
latch.await();
assertThat(error.get(), instanceOf(CancellationException.class));
}

public void testAutomaticCancellationDuringFetchPhase() throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();

List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

Request searchRequest = new Request("GET", "/test/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field",
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()));
searchRequest.setJsonEntity(Strings.toString(searchSource));
verifyCancellationDuringFetchPhase(SearchAction.NAME, searchRequest);
}

public void testAutomaticCancellationMultiSearchDuringFetchPhase() throws Exception {
XContentType contentType = XContentType.JSON;
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
.source(new SearchSourceBuilder().scriptField("test_field",
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
Request restRequest = new Request("POST", "/_msearch");
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
verifyCancellationDuringFetchPhase(MultiSearchAction.NAME, restRequest);
}

void verifyCancellationDuringFetchPhase(String searchAction, Request searchRequest) throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();

List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
Expand All @@ -137,7 +172,7 @@ public void onFailure(Exception exception) {

awaitForBlock(plugins);
cancellable.cancel();
ensureSearchTaskIsCancelled(nodeIdToName::get);
ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);

disableBlocks(plugins);
latch.await();
Expand All @@ -154,11 +189,11 @@ private static Map<String, String> readNodesInfo() {
return nodeIdToName;
}

private static void ensureSearchTaskIsCancelled(Function<String, String> nodeIdToName) throws Exception {
private static void ensureSearchTaskIsCancelled(String transportAction, Function<String, String> nodeIdToName) throws Exception {
SetOnce<TaskInfo> searchTask = new SetOnce<>();
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
for (TaskInfo task : listTasksResponse.getTasks()) {
if (task.getAction().equals(SearchAction.NAME)) {
if (task.getAction().equals(transportAction)) {
searchTask.set(task);
}
}
Expand Down Expand Up @@ -248,4 +283,8 @@ public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
});
}
}

private static ContentType createContentType(final XContentType xContentType) {
return ContentType.create(xContentType.mediaTypeWithoutParameters(), (Charset) null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.rest.action.search;

import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.search.builder.SearchSourceBuilder;

Expand Down Expand Up @@ -79,8 +81,11 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
MultiSearchRequest multiSearchRequest = parseRequest(request, allowExplicitIndex);
return channel -> client.multiSearch(multiSearchRequest, new RestToXContentListener<>(channel));
final MultiSearchRequest multiSearchRequest = parseRequest(request, allowExplicitIndex);
return channel -> {
final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
cancellableClient.execute(MultiSearchAction.INSTANCE, multiSearchRequest, new RestToXContentListener<>(channel));
};
}

/**
Expand Down

0 comments on commit 066e83c

Please sign in to comment.