Skip to content

Commit

Permalink
Wrap enrich execute action in new tracing context (#89021)
Browse files Browse the repository at this point in the history
Part of #84369. Split out from #88443. This PR wraps parts logic in
`InternalExecutePolicyAction` in a new tracing context. This is
necessary so that a tracing implementation can use the thread context
to propagate tracing headers, but without the code attempting to set the
same key twice in the thread context, which is illegal.
  • Loading branch information
pugnascotia authored Aug 2, 2022
1 parent e4214ef commit bc840f9
Showing 1 changed file with 56 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,56 +88,64 @@ protected void doExecute(Task transportTask, Request request, ActionListener<Res
return;
}

// Can't use provided task, because in the case wait_for_completion=false then
// as soon as actionListener#onResponse is invoked then the provided task get unregistered and
// then there no way to see the policy execution in the list tasks or get task APIs.
var task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {

@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
String description = "executing enrich policy [" + request.getName() + "]";
return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers);
}
});

try {
ActionListener<ExecuteEnrichPolicyStatus> listener;
if (request.isWaitForCompletion()) {
listener = ActionListener.wrap(result -> actionListener.onResponse(new Response(result)), actionListener::onFailure);
} else {
listener = ActionListener.wrap(result -> LOGGER.debug("successfully executed policy [{}]", request.getName()), e -> {
if (e instanceof TaskCancelledException) {
LOGGER.info(e.getMessage());
} else {
LOGGER.error("failed to execute policy [" + request.getName() + "]", e);
}
});
}
policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> {
try (var ignored = transportService.getThreadPool().getThreadContext().newTraceContext()) {
// Can't use provided task, because in the case wait_for_completion=false then
// as soon as actionListener#onResponse is invoked then the provided task get unregistered and
// then there no way to see the policy execution in the list tasks or get task APIs.
var task = (ExecuteEnrichPolicyTask) taskManager.register("enrich", TASK_ACTION, new TaskAwareRequest() {

@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
String description = "executing enrich policy [" + request.getName() + "]";
return new ExecuteEnrichPolicyTask(id, type, action, description, parentTaskId, headers);
}
});

try {
ActionListener<ExecuteEnrichPolicyStatus> listener;
if (request.isWaitForCompletion()) {
listener = ActionListener.wrap(
result -> actionListener.onResponse(new Response(result)),
actionListener::onFailure
);
} else {
listener = ActionListener.wrap(
result -> LOGGER.debug("successfully executed policy [{}]", request.getName()),
e -> {
if (e instanceof TaskCancelledException) {
LOGGER.info(e.getMessage());
} else {
LOGGER.error("failed to execute policy [" + request.getName() + "]", e);
}
}
);
}
policyExecutor.runPolicyLocally(task, request.getName(), ActionListener.wrap(result -> {
taskManager.unregister(task);
listener.onResponse(result);
}, e -> {
taskManager.unregister(task);
listener.onFailure(e);
}));

if (request.isWaitForCompletion() == false) {
TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId());
actionListener.onResponse(new Response(taskId));
}
} catch (Exception e) {
taskManager.unregister(task);
listener.onResponse(result);
}, e -> {
taskManager.unregister(task);
listener.onFailure(e);
}));

if (request.isWaitForCompletion() == false) {
TaskId taskId = new TaskId(clusterState.nodes().getLocalNodeId(), task.getId());
actionListener.onResponse(new Response(taskId));
throw e;
}
} catch (Exception e) {
taskManager.unregister(task);
throw e;
}
}

Expand Down

0 comments on commit bc840f9

Please sign in to comment.