Skip to content

Commit

Permalink
Async search: create internal index if necessary only before storing …
Browse files Browse the repository at this point in the history
…initial response

We currently create the .async-search index if necessary before performing any action (index, update or delete). Truth is that this is needed only before storing the initial response. The other operations are either update or delete, which will anyways not find the document to update/delete even if the index gets created when missing. This also caused `testCancellation` failures as we were trying to delete the document twice from the .async-search index, once from `TransportDeleteAsyncSearchAction` and once as a consequence of the search task being completed. The latter may be called after the test is completed, but before the cluster is shut down and causing problems to the after test checks, for instance if it happens after all the indices have been cleaned up. It is totally fine to try to delete a response that is no longer found, but not quite so if such call will also trigger an index creation.

With this commit we remove all the calls to createIndexIfNecessary from the update/delete operation, and we leave one call only from storeInitialResponse which is where the index is expected to be created.

Closes elastic#54180
  • Loading branch information
javanna committed Apr 1, 2020
1 parent 02772ca commit cd2cd87
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand Down Expand Up @@ -180,7 +178,7 @@ void storeFinalResponse(String docId,
.index(INDEX)
.id(docId)
.doc(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
Expand All @@ -194,31 +192,16 @@ void updateExpirationTime(String docId,
UpdateRequest request = new UpdateRequest().index(INDEX)
.id(docId)
.doc(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
* Deletes the provided <code>searchId</code> from the index if present.
*/
void deleteResponse(AsyncSearchId searchId,
boolean failIfNotFound,
ActionListener<AcknowledgedResponse> listener) {
ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId());
createIndexIfNecessary(
ActionListener.wrap(v -> client.delete(request,
ActionListener.wrap(
resp -> {
if (resp.status() == RestStatus.NOT_FOUND && failIfNotFound) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
} else {
listener.onResponse(new AcknowledgedResponse(true));
}
},
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
})),
listener::onFailure));
client.delete(request, listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void addCompletionListener(Consumer<AsyncSearchResponse> listener) {
if (hasCompleted) {
executeImmediately = true;
} else {
completionListeners.put(completionId++, listener::accept);
completionListeners.put(completionId++, listener);
}
}
if (executeImmediately) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
*/
package org.elasticsearch.xpack.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -15,6 +20,8 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -24,6 +31,8 @@
import java.io.IOException;

public class TransportDeleteAsyncSearchAction extends HandledTransportAction<DeleteAsyncSearchAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class);

private final ClusterService clusterService;
private final TransportService transportService;
private final AsyncSearchIndexService store;
Expand Down Expand Up @@ -58,15 +67,40 @@ protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, Act
}
}

private void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
AsyncSearchTask task = store.getTask(taskManager, searchId);
if (task != null) {
task.cancelTask(() -> store.deleteResponse(searchId, false, listener));
//the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways.
task.cancelTask(() -> store.deleteResponse(searchId,
ActionListener.wrap(
r -> listener.onResponse(new AcknowledgedResponse(true)),
exc -> {
//the index may not be there (no initial async search response stored yet?): we still want to return 200
if (exc.getCause() instanceof IndexNotFoundException) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
}
})));
} else {
// the task is not running anymore so we throw a not found exception if
// the search id is also not present in the index (already deleted) or if the user
// is not allowed to access it.
store.getResponse(searchId, ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure));
// the task was not found (already cancelled, already completed, or invalid id?)
// we fail if the response is not found in the index
ActionListener<DeleteResponse> deleteListener = ActionListener.wrap(
resp -> {
if (resp.status() == RestStatus.NOT_FOUND) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
} else {
listener.onResponse(new AcknowledgedResponse(true));
}
},
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
}
);
//we get before deleting to verify that the user is authorized
store.getResponse(searchId, ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -57,8 +59,12 @@ protected void doExecute(Task task, GetAsyncSearchAction.Request request, Action
ActionListener.wrap(
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
logger.error("failed to retrieve " + searchId.getEncoded(), exc);
//don't even log when: the async search document or its index is not found. That can happen if an invalid
//search id is provided and no async search initial response has been stored yet.
if (exc.getCause() instanceof DocumentMissingException == false
&& exc.getCause() instanceof IndexNotFoundException == false) {
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
searchId.getEncoded()), exc);
}
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -139,7 +139,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId,
() -> submitTask.isCancelled(), keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
nodeClient.threadPool(), aggReduceContextSupplier);
}
};
Expand Down Expand Up @@ -170,36 +170,34 @@ private void onFinalResponse(CancellableTask submitTask,
AsyncSearchResponse response,
Runnable nextAction) {
if (submitTask.isCancelled() || searchTask.isCancelled()) {
// the user cancelled the submit so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getSearchId(), false, ActionListener.wrap(() -> {
taskManager.unregister(searchTask);
nextAction.run();
}));
// the task was cancelled so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getSearchId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
}

try {
store.storeFinalResponse(searchTask.getSearchId().getDocId(), response, new ActionListener<>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
taskManager.unregister(searchTask);
nextAction.run();
}

@Override
public void onFailure(Exception exc) {
store.storeFinalResponse(searchTask.getSearchId().getDocId(), response, ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
taskManager.unregister(searchTask);
nextAction.run();
}
});
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
} catch (Exception exc) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc);
taskManager.unregister(searchTask);
nextAction.run();
unregisterTaskAndMoveOn(searchTask, nextAction);
}
}

private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {
taskManager.unregister(searchTask);
nextAction.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ public void testNoIndex() throws Exception {
assertThat(exc.getMessage(), containsString("no such index"));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/54180")
public void testCancellation() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
request.getSearchRequest().source(
Expand Down

0 comments on commit cd2cd87

Please sign in to comment.