From 241f1bc2be274093879b8fb66dec1ccb13614ac1 Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Tue, 2 Aug 2022 11:33:46 +0100 Subject: [PATCH] Wrap async ql task execution in new tracing context (#89029) Part of #84369. Split out from #88443. This PR wraps parts logic in `AsyncTaskManagementService` in a new tracing context. This is necessary so that a tracing implementation can use the thread context to propagate tracing headers, but without the code attempting to set the same key twice in the thread context, which is illegal. --- .../ql/async/AsyncTaskManagementService.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java index eacb25b10d622..d42f2619a166a 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java @@ -170,20 +170,22 @@ public void asyncExecute( ActionListener listener ) { String nodeId = clusterService.localNode().getId(); - @SuppressWarnings("unchecked") - T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId)); - boolean operationStarted = false; - try { - operation.execute( - request, - searchTask, - wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener) - ); - operationStarted = true; - } finally { - // If we didn't start operation for any reason, we need to clean up the task that we have created - if (operationStarted == false) { - taskManager.unregister(searchTask); + try (var ignored = threadPool.getThreadContext().newTraceContext()) { + @SuppressWarnings("unchecked") + T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId)); + boolean operationStarted = false; + try { + operation.execute( + request, + searchTask, + wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener) + ); + operationStarted = true; + } finally { + // If we didn't start operation for any reason, we need to clean up the task that we have created + if (operationStarted == false) { + taskManager.unregister(searchTask); + } } } }