diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java index 480acc185f016..5eee0cc296573 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/InternalExecutePolicyAction.java @@ -88,56 +88,64 @@ protected void doExecute(Task transportTask, Request request, ActionListener headers) { - String description = "executing enrich policy [" + request.getName() + "]"; - return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers); - } - }); - - try { - ActionListener listener; - if (request.isWaitForCompletion()) { - listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure); - } else { - listener = ActionListener.wrap(result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), e -> { - if (e instanceof TaskCancelledException) { - LOGGER.info(e.getMessage()); - } else { - LOGGER.error("failed to execute policy [" + request.getName() + "]", e); - } - }); - } - policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> { + try (var ignored = transportService.getThreadPool().getThreadContext().newTraceContext()) { + // Can't use provided task, because in the case wait_for_completion=false then + // as soon as actionListener#onResponse is invoked then the provided task get unregistered and + // then there no way to see the policy execution in the list tasks or get task APIs. + var task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() { + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + String description = "executing enrich policy [" + request.getName() + "]"; + return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers); + } + }); + + try { + ActionListener listener; + if (request.isWaitForCompletion()) { + listener = ActionListener.wrap( + result -> actionListener.onResponse(new Response(result)), + actionListener::onFailure + ); + } else { + listener = ActionListener.wrap( + result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), + e -> { + if (e instanceof TaskCancelledException) { + LOGGER.info(e.getMessage()); + } else { + LOGGER.error("failed to execute policy [" + request.getName() + "]", e); + } + } + ); + } + policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> { + taskManager.unregister(task); + listener.onResponse(result); + }, e -> { + taskManager.unregister(task); + listener.onFailure(e); + })); + + if (request.isWaitForCompletion() == false) { + TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId()); + actionListener.onResponse(new Response(taskId)); + } + } catch (Exception e) { taskManager.unregister(task); - listener.onResponse(result); - }, e -> { - taskManager.unregister(task); - listener.onFailure(e); - })); - - if (request.isWaitForCompletion() == false) { - TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId()); - actionListener.onResponse(new Response(taskId)); + throw e; } - } catch (Exception e) { - taskManager.unregister(task); - throw e; } }