Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async search: create internal index only before storing initial response #54619

Merged
merged 5 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand Down Expand Up @@ -188,7 +186,7 @@ void storeFinalResponse(String docId,
.index(INDEX)
.id(docId)
.doc(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
Expand All @@ -202,31 +200,16 @@ void updateExpirationTime(String docId,
UpdateRequest request = new UpdateRequest().index(INDEX)
.id(docId)
.doc(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
* Deletes the provided <code>searchId</code> from the index if present.
*/
void deleteResponse(AsyncSearchId searchId,
boolean failIfNotFound,
ActionListener<AcknowledgedResponse> listener) {
ActionListener<DeleteResponse> listener) {
DeleteRequest request = new DeleteRequest(INDEX).id(searchId.getDocId());
createIndexIfNecessary(
ActionListener.wrap(v -> client.delete(request,
ActionListener.wrap(
resp -> {
if (resp.status() == RestStatus.NOT_FOUND && failIfNotFound) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
} else {
listener.onResponse(new AcknowledgedResponse(true));
}
},
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
})),
listener::onFailure));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make this logic more readable. I found the boolean flag hard to reason about especially as it was provided true only once. I moved the listener wrapping to the callers, where each caller needs to do something different.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

client.delete(request, listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void addCompletionListener(Consumer<AsyncSearchResponse> listener) {
if (hasCompleted) {
executeImmediately = true;
} else {
completionListeners.put(completionId++, listener::accept);
completionListeners.put(completionId++, listener);
}
}
if (executeImmediately) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
*/
package org.elasticsearch.xpack.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -15,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -24,6 +31,8 @@
import java.io.IOException;

public class TransportDeleteAsyncSearchAction extends HandledTransportAction<DeleteAsyncSearchAction.Request, AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(TransportDeleteAsyncSearchAction.class);

private final ClusterService clusterService;
private final TransportService transportService;
private final AsyncSearchIndexService store;
Expand Down Expand Up @@ -58,16 +67,43 @@ protected void doExecute(Task task, DeleteAsyncSearchAction.Request request, Act
}
}

private void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
void cancelTaskAndDeleteResult(AsyncSearchId searchId, ActionListener<AcknowledgedResponse> listener) throws IOException {
AsyncSearchTask task = store.getTask(taskManager, searchId);
if (task != null) {
task.cancelTask(() -> store.deleteResponse(searchId, false, listener));
//the task was found and gets cancelled. The response may or may not be found, but we will return 200 anyways.
task.cancelTask(() -> store.deleteResponse(searchId,
ActionListener.wrap(
r -> listener.onResponse(new AcknowledgedResponse(true)),
exc -> {
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
//the index may not be there (no initial async search response stored yet?): we still want to return 200
//note that index missing comes back as 200 hence it's handled in the onResponse callback
if (status == RestStatus.NOT_FOUND) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
}
})));
} else {
// the task is not running anymore so we throw a not found exception if
// the search id is also not present in the index (already deleted) or if the user
// is not allowed to access it.
// the task was not found (already cancelled, already completed, or invalid id?)
// we fail if the response is not found in the index
ActionListener<DeleteResponse> deleteListener = ActionListener.wrap(
resp -> {
if (resp.status() == RestStatus.NOT_FOUND) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
} else {
listener.onResponse(new AcknowledgedResponse(true));
}
},
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
}
);
//we get before deleting to verify that the user is authorized
store.getResponse(searchId, false,
ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure));
ActionListener.wrap(res -> store.deleteResponse(searchId, deleteListener), listener::onFailure));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -17,7 +19,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -57,8 +59,12 @@ protected void doExecute(Task task, GetAsyncSearchAction.Request request, Action
ActionListener.wrap(
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
logger.error("failed to retrieve " + searchId.getEncoded(), exc);
//don't log when: the async search document or its index is not found. That can happen if an invalid
//search id is provided or no async search initial response has been stored yet.
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
if (status != RestStatus.NOT_FOUND) {
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
searchId.getEncoded()), exc);
}
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -142,7 +142,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId,
() -> submitTask.isCancelled(), keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
nodeClient.threadPool(), aggReduceContextSupplier);
}
};
Expand Down Expand Up @@ -173,37 +173,34 @@ private void onFinalResponse(CancellableTask submitTask,
AsyncSearchResponse response,
Runnable nextAction) {
if (submitTask.isCancelled() || searchTask.isCancelled()) {
// the user cancelled the submit so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getSearchId(), false, ActionListener.wrap(() -> {
taskManager.unregister(searchTask);
nextAction.run();
}));
// the task was cancelled so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getSearchId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchTask.getSearchId()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
}

try {
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(), response,
new ActionListener<>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
taskManager.unregister(searchTask);
nextAction.run();
}

@Override
public void onFailure(Exception exc) {
if (exc.getCause() instanceof DocumentMissingException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
taskManager.unregister(searchTask);
nextAction.run();
}
});
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
if (exc.getCause() instanceof DocumentMissingException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getSearchId().getEncoded()), exc);
}
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
} catch (Exception exc) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc);
taskManager.unregister(searchTask);
nextAction.run();
unregisterTaskAndMoveOn(searchTask, nextAction);
}
}

private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {
taskManager.unregister(searchTask);
nextAction.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ public void testNoIndex() throws Exception {
assertThat(exc.getMessage(), containsString("no such index"));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/54180")
public void testCancellation() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
request.getSearchRequest().source(
Expand Down