diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java index 60ecbf354faf1..08df7e5769b59 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java @@ -7,17 +7,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; @@ -34,7 +33,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; @@ -188,7 +186,7 @@ void storeFinalResponse(String docId, .index(INDEX) .id(docId) .doc(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** @@ -202,31 +200,16 @@ void updateExpirationTime(String docId, UpdateRequest request = new UpdateRequest().index(INDEX) .id(docId) .doc(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** * Deletes the provided searchId from the index if present. */ void deleteResponse(AsyncSearchId searchId, - boolean failIfNotFound, - ActionListener listener) { + ActionListener listener) { DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId()); - createIndexIfNecessary( - ActionListener.wrap(v -> client.delete(request, - ActionListener.wrap( - resp -> { - if (resp.status() == RestStatus.NOT_FOUND && failIfNotFound) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } else { - listener.onResponse(new AcknowledgedResponse(true)); - } - }, - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); - listener.onFailure(exc); - })), - listener::onFailure)); + client.delete(request, listener); } /** diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index f06a21726a5ea..76d7f3fe91eab 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -196,7 +196,7 @@ public void addCompletionListener(Consumer listener) { if (hasCompleted) { executeImmediately = true; } else { - completionListeners.put(completionId++, listener::accept); + completionListeners.put(completionId++, listener); } } if (executeImmediately) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index 4c55536e26213..62c08ff37d5be 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -5,8 +5,14 @@ */ package org.elasticsearch.xpack.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -15,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; @@ -24,6 +31,8 @@ import java.io.IOException; public class TransportDeleteAsyncSearchAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class); + private final ClusterService clusterService; private final TransportService transportService; private final AsyncSearchIndexService store; @@ -58,16 +67,43 @@ protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, Act } } - private void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener listener) throws IOException { + void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener listener) throws IOException { AsyncSearchTask task = store.getTask(taskManager, searchId); if (task != null) { - task.cancelTask(() -> store.deleteResponse(searchId, false, listener)); + //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. + task.cancelTask(() -> store.deleteResponse(searchId, + ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(true)), + exc -> { + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + //the index may not be there (no initial async search response stored yet?): we still want to return 200 + //note that index missing comes back as 200 hence it's handled in the onResponse callback + if (status == RestStatus.NOT_FOUND) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + }))); } else { - // the task is not running anymore so we throw a not found exception if - // the search id is also not present in the index (already deleted) or if the user - // is not allowed to access it. + // the task was not found (already cancelled, already completed, or invalid id?) + // we fail if the response is not found in the index + ActionListener deleteListener = ActionListener.wrap( + resp -> { + if (resp.status() == RestStatus.NOT_FOUND) { + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } else { + listener.onResponse(new AcknowledgedResponse(true)); + } + }, + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + ); + //we get before deleting to verify that the user is authorized store.getResponse(searchId, false, - ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure)); + ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index f95e6a9a21982..0cf1b3c081311 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -7,6 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -17,7 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; @@ -57,8 +59,12 @@ protected void doExecute(Task task, GetAsyncSearchAction.Request request, Action ActionListener.wrap( p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - if (exc.getCause() instanceof DocumentMissingException == false) { - logger.error("failed to retrieve " + searchId.getEncoded(), exc); + //don't log when: the async search document or its index is not found. That can happen if an invalid + //search id is provided or no async search initial response has been stored yet. + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + if (status != RestStatus.NOT_FOUND) { + logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", + searchId.getEncoded()), exc); } listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index ee2859a69d72b..684f915e2ce69 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -12,10 +12,10 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; @@ -142,7 +142,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa Supplier aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); return new AsyncSearchTask(id, type, action, parentTaskId, - () -> submitTask.isCancelled(), keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), + submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier); } }; @@ -173,37 +173,34 @@ private void onFinalResponse(CancellableTask submitTask, AsyncSearchResponse response, Runnable nextAction) { if (submitTask.isCancelled() || searchTask.isCancelled()) { - // the user cancelled the submit so we ensure that there is nothing stored in the response index. - store.deleteResponse(searchTask.getSearchId(), false, ActionListener.wrap(() -> { - taskManager.unregister(searchTask); - nextAction.run(); - })); + // the task was cancelled so we ensure that there is nothing stored in the response index. + store.deleteResponse(searchTask.getSearchId(), ActionListener.wrap( + resp -> unregisterTaskAndMoveOn(searchTask, nextAction), + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc); + unregisterTaskAndMoveOn(searchTask, nextAction); + })); return; } try { - store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(), response, - new ActionListener<>() { - @Override - public void onResponse(UpdateResponse updateResponse) { - taskManager.unregister(searchTask); - nextAction.run(); - } - - @Override - public void onFailure(Exception exc) { - if (exc.getCause() instanceof DocumentMissingException == false) { - logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", - searchTask.getSearchId().getEncoded()), exc); - } - taskManager.unregister(searchTask); - nextAction.run(); - } - }); + store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response, + ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction), + exc -> { + if (exc.getCause() instanceof DocumentMissingException == false) { + logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", + searchTask.getSearchId().getEncoded()), exc); + } + unregisterTaskAndMoveOn(searchTask, nextAction); + })); } catch (Exception exc) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc); - taskManager.unregister(searchTask); - nextAction.run(); + unregisterTaskAndMoveOn(searchTask, nextAction); } } + + private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) { + taskManager.unregister(searchTask); + nextAction.run(); + } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index e6712d633108e..aac4d1ff1d3cb 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -254,7 +254,6 @@ public void testNoIndex() throws Exception { assertThat(exc.getMessage(), containsString("no such index")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/54180") public void testCancellation() throws Exception { SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); request.getSearchRequest().source(