From cd2cd8740a9de7cb4948f03ade945885a15d3f4f Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 2 Apr 2020 00:37:17 +0200 Subject: [PATCH 1/3] Async search: create internal index if necessary only before storing initial response We currently create the .async-search index if necessary before performing any action (index, update or delete). Truth is that this is needed only before storing the initial response. The other operations are either update or delete, which will anyways not find the document to update/delete even if the index gets created when missing. This also caused `testCancellation` failures as we were trying to delete the document twice from the .async-search index, once from `TransportDeleteAsyncSearchAction` and once as a consequence of the search task being completed. The latter may be called after the test is completed, but before the cluster is shut down and causing problems to the after test checks, for instance if it happens after all the indices have been cleaned up. It is totally fine to try to delete a response that is no longer found, but not quite so if such call will also trigger an index creation. With this commit we remove all the calls to createIndexIfNecessary from the update/delete operation, and we leave one call only from storeInitialResponse which is where the index is expected to be created. Closes #54180 --- .../xpack/search/AsyncSearchIndexService.java | 27 ++--------- .../xpack/search/AsyncSearchTask.java | 2 +- .../TransportDeleteAsyncSearchAction.java | 46 ++++++++++++++++--- .../search/TransportGetAsyncSearchAction.java | 10 +++- .../TransportSubmitAsyncSearchAction.java | 42 ++++++++--------- .../xpack/search/AsyncSearchActionIT.java | 1 - 6 files changed, 74 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java index b1e08e21f59cb..a52c7fdb93715 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java @@ -7,17 +7,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; @@ -34,7 +33,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; @@ -180,7 +178,7 @@ void storeFinalResponse(String docId, .index(INDEX) .id(docId) .doc(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** @@ -194,31 +192,16 @@ void updateExpirationTime(String docId, UpdateRequest request = new UpdateRequest().index(INDEX) .id(docId) .doc(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** * Deletes the provided searchId from the index if present. */ void deleteResponse(AsyncSearchId searchId, - boolean failIfNotFound, - ActionListener listener) { + ActionListener listener) { DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId()); - createIndexIfNecessary( - ActionListener.wrap(v -> client.delete(request, - ActionListener.wrap( - resp -> { - if (resp.status() == RestStatus.NOT_FOUND && failIfNotFound) { - listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); - } else { - listener.onResponse(new AcknowledgedResponse(true)); - } - }, - exc -> { - logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); - listener.onFailure(exc); - })), - listener::onFailure)); + client.delete(request, listener); } /** diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index 98e6d099a5ee9..a18572fd7f45a 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -197,7 +197,7 @@ public void addCompletionListener(Consumer listener) { if (hasCompleted) { executeImmediately = true; } else { - completionListeners.put(completionId++, listener::accept); + completionListeners.put(completionId++, listener); } } if (executeImmediately) { diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index 14bf4eafec517..51f7b2814df9e 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -5,8 +5,13 @@ */ package org.elasticsearch.xpack.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -15,6 +20,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; @@ -24,6 +31,8 @@ import java.io.IOException; public class TransportDeleteAsyncSearchAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class); + private final ClusterService clusterService; private final TransportService transportService; private final AsyncSearchIndexService store; @@ -58,15 +67,40 @@ protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, Act } } - private void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener listener) throws IOException { + void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener listener) throws IOException { AsyncSearchTask task = store.getTask(taskManager, searchId); if (task != null) { - task.cancelTask(() -> store.deleteResponse(searchId, false, listener)); + //the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways. + task.cancelTask(() -> store.deleteResponse(searchId, + ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(true)), + exc -> { + //the index may not be there (no initial async search response stored yet?): we still want to return 200 + if (exc.getCause() instanceof IndexNotFoundException) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + }))); } else { - // the task is not running anymore so we throw a not found exception if - // the search id is also not present in the index (already deleted) or if the user - // is not allowed to access it. - store.getResponse(searchId, ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure)); + // the task was not found (already cancelled, already completed, or invalid id?) + // we fail if the response is not found in the index + ActionListener deleteListener = ActionListener.wrap( + resp -> { + if (resp.status() == RestStatus.NOT_FOUND) { + listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); + } else { + listener.onResponse(new AcknowledgedResponse(true)); + } + }, + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); + listener.onFailure(exc); + } + ); + //we get before deleting to verify that the user is authorized + store.getResponse(searchId, ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure)); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 01c807b19aea1..e23af76a1ed5c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -17,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -57,8 +59,12 @@ protected void doExecute(Task task, GetAsyncSearchAction.Request request, Action ActionListener.wrap( p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - if (exc.getCause() instanceof DocumentMissingException == false) { - logger.error("failed to retrieve " + searchId.getEncoded(), exc); + //don't even log when: the async search document or its index is not found. That can happen if an invalid + //search id is provided and no async search initial response has been stored yet. + if (exc.getCause() instanceof DocumentMissingException == false + && exc.getCause() instanceof IndexNotFoundException == false) { + logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", + searchId.getEncoded()), exc); } listener.onFailure(new ResourceNotFoundException(searchId.getEncoded())); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index efdf40da76f73..54a58882567e1 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -12,10 +12,10 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; @@ -139,7 +139,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa Supplier aggReduceContextSupplier = () -> requestToAggReduceContextBuilder.apply(request.getSearchRequest()); return new AsyncSearchTask(id, type, action, parentTaskId, - () -> submitTask.isCancelled(), keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), + submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier); } }; @@ -170,36 +170,34 @@ private void onFinalResponse(CancellableTask submitTask, AsyncSearchResponse response, Runnable nextAction) { if (submitTask.isCancelled() || searchTask.isCancelled()) { - // the user cancelled the submit so we ensure that there is nothing stored in the response index. - store.deleteResponse(searchTask.getSearchId(), false, ActionListener.wrap(() -> { - taskManager.unregister(searchTask); - nextAction.run(); - })); + // the task was cancelled so we ensure that there is nothing stored in the response index. + store.deleteResponse(searchTask.getSearchId(), ActionListener.wrap( + resp -> unregisterTaskAndMoveOn(searchTask, nextAction), + exc -> { + logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc); + unregisterTaskAndMoveOn(searchTask, nextAction); + })); return; } try { - store.storeFinalResponse(searchTask.getSearchId().getDocId(), response, new ActionListener<>() { - @Override - public void onResponse(UpdateResponse updateResponse) { - taskManager.unregister(searchTask); - nextAction.run(); - } - - @Override - public void onFailure(Exception exc) { + store.storeFinalResponse(searchTask.getSearchId().getDocId(), response, ActionListener.wrap( + resp -> unregisterTaskAndMoveOn(searchTask, nextAction), + exc -> { if (exc.getCause() instanceof DocumentMissingException == false) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc); } - taskManager.unregister(searchTask); - nextAction.run(); - } - }); + unregisterTaskAndMoveOn(searchTask, nextAction); + })); } catch (Exception exc) { logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc); - taskManager.unregister(searchTask); - nextAction.run(); + unregisterTaskAndMoveOn(searchTask, nextAction); } } + + private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) { + taskManager.unregister(searchTask); + nextAction.run(); + } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index e6712d633108e..aac4d1ff1d3cb 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -254,7 +254,6 @@ public void testNoIndex() throws Exception { assertThat(exc.getMessage(), containsString("no such index")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/54180") public void testCancellation() throws Exception { SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); request.getSearchRequest().source( From f739f197e0da833a97801d70d73835431c5e4348 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 10 Apr 2020 14:12:06 +0200 Subject: [PATCH 2/3] iter --- .../xpack/search/TransportDeleteAsyncSearchAction.java | 5 ++++- .../xpack/search/TransportGetAsyncSearchAction.java | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index 87afb6515a031..4fcd4168ae583 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -75,8 +76,10 @@ void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener listener.onResponse(new AcknowledgedResponse(true)), exc -> { + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); //the index may not be there (no initial async search response stored yet?): we still want to return 200 - if (exc.getCause() instanceof IndexNotFoundException) { + //note that index missing comes back as 200 hence it's handled in the onResponse callback + if (status == RestStatus.NOT_FOUND) { listener.onResponse(new AcknowledgedResponse(true)); } else { logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index eeaff21fe19ee..00d0e536a4eb5 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -20,6 +21,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; @@ -59,10 +61,10 @@ protected void doExecute(Task task, GetAsyncSearchAction.Request request, Action ActionListener.wrap( p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener), exc -> { - //don't even log when: the async search document or its index is not found. That can happen if an invalid - //search id is provided and no async search initial response has been stored yet. - if (exc.getCause() instanceof DocumentMissingException == false - && exc.getCause() instanceof IndexNotFoundException == false) { + //don't log when: the async search document or its index is not found. That can happen if an invalid + //search id is provided or no async search initial response has been stored yet. + RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc)); + if (status != RestStatus.NOT_FOUND) { logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]", searchId.getEncoded()), exc); } From b2f956dea50bc9b936d280ac926d2bba304e01a1 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 10 Apr 2020 14:18:40 +0200 Subject: [PATCH 3/3] checkstyle --- .../xpack/search/TransportDeleteAsyncSearchAction.java | 1 - .../xpack/search/TransportGetAsyncSearchAction.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java index 4fcd4168ae583..62c08ff37d5be 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java index 00d0e536a4eb5..0cf1b3c081311 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java @@ -19,8 +19,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool;