Skip to content

Commit

Permalink
[Transform] handle update error correctly (#88619) (#88670)
Browse files Browse the repository at this point in the history
If updating a transform fails a bug prevents reporting the error correctly

relates #88434
  • Loading branch information
Hendrik Muhs authored Jul 21, 2022
1 parent 2e8760e commit d33590f
Showing 1 changed file with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.List;
import java.util.Map;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable;

public class TransportUpdateTransformAction extends TransportTasksAction<TransformTask, Request, Response, Response> {
Expand Down Expand Up @@ -169,9 +171,35 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {

ActionListener<Response> taskUpdateListener = ActionListener.wrap(listener::onResponse, e -> {
// benign: A transform might be stopped meanwhile, this is not a problem
if (e instanceof TransformTaskDisappearedDuringUpdateException) {
logger.debug("[{}] transform task disappeared during update, ignoring", request.getId());
listener.onResponse(new Response(updatedConfig));
return;
}

if (e instanceof TransformTaskUpdateException) {
// BWC: only log a warning as response object can not be changed
logger.warn(
() -> format(
"[%s] failed to notify running transform task about update. "
+ "New settings will be applied after next checkpoint.",
request.getId()
),
e
);

listener.onResponse(new Response(updatedConfig));
return;
}

listener.onFailure(e);
});

request.setNodes(transformTask.getExecutorNode());
request.setConfig(updatedConfig);
super.doExecute(task, request, listener);
super.doExecute(task, request, taskUpdateListener);
return;
}
}
Expand Down Expand Up @@ -208,8 +236,29 @@ protected Response newResponse(
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions
) {
// there should be only 1 response, todo: check
if (tasks.isEmpty()) {
if (taskOperationFailures.isEmpty() == false) {
throw new TransformTaskUpdateException("Failed to update running transform task.", taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw new TransformTaskUpdateException("Failed to update running transform task.", failedNodeExceptions.get(0));
} else {
throw new TransformTaskDisappearedDuringUpdateException("Could not update running transform as it has been stopped.");
}
}

return tasks.get(0);
}

private static class TransformTaskUpdateException extends ElasticsearchException {
TransformTaskUpdateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}

private static class TransformTaskDisappearedDuringUpdateException extends ElasticsearchException {
TransformTaskDisappearedDuringUpdateException(String msg) {
super(msg);
}
}

}

0 comments on commit d33590f

Please sign in to comment.