diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index b73a35d3456d1..fb04afe0b268d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ExecutorSelector; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.plugins.TracingPlugin; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -77,7 +76,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -131,7 +129,6 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.threadPool = threadPool; @@ -161,7 +157,6 @@ public TransportSearchAction( this.namedWriteableRegistry = namedWriteableRegistry; this.executorSelector = executorSelector; this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(clusterService.getSettings()); - this.tracer = Objects.requireNonNull(tracer); } private Map buildPerIndexOriginalIndices( diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 0b62fb75ffbd6..792d52e6daf2b 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -168,6 +168,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancellationService; import org.elasticsearch.tasks.TaskResultsService; +import org.elasticsearch.tasks.TaskTracer; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; @@ -208,7 +209,6 @@ import static java.util.stream.Collectors.toList; import static org.elasticsearch.core.Types.forciblyCast; -import static org.elasticsearch.plugins.TracingPlugin.NO_TRACING; /** * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used @@ -728,11 +728,6 @@ protected Node( } new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders); - final TracingPlugin.Tracer tracer = (TracingPlugin.Tracer) pluginComponents.stream() - .filter(c -> c instanceof TracingPlugin.Tracer) - .findFirst() - .orElse(NO_TRACING); - final Transport transport = networkModule.getTransportSupplier().get(); Set taskHeaders = Stream.concat( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), @@ -757,6 +752,11 @@ protected Node( final HttpServerTransport httpServerTransport = newHttpTransport(networkModule); final IndexingPressure indexingLimits = new IndexingPressure(settings); + final TaskTracer taskTracer = transportService.getTaskManager().getTaskTracer(); + pluginComponents.stream() + .map(c -> c instanceof TracingPlugin.Tracer ? (TracingPlugin.Tracer) c : null) + .forEach(taskTracer::addTracer); + final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, @@ -974,7 +974,6 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(PluginShutdownService.class).toInstance(pluginShutdownService); b.bind(ExecutorSelector.class).toInstance(executorSelector); - b.bind(TracingPlugin.Tracer.class).toInstance(tracer); }); injector = modules.createInjector(); diff --git a/server/src/main/java/org/elasticsearch/plugins/TracingPlugin.java b/server/src/main/java/org/elasticsearch/plugins/TracingPlugin.java index 0aff495262c56..cf1860184d57d 100644 --- a/server/src/main/java/org/elasticsearch/plugins/TracingPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/TracingPlugin.java @@ -8,11 +8,13 @@ package org.elasticsearch.plugins; -public interface TracingPlugin { +import org.elasticsearch.tasks.Task; - Tracer NO_TRACING = something -> {}; +public interface TracingPlugin { interface Tracer { - void trace(String something); + void onTaskRegistered(Task task); + + void onTaskUnregistered(Task task); } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index b4fe32454f03b..df711096a12c2 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -92,6 +92,8 @@ public class TaskManager implements ClusterStateApplier { private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES; + private final TaskTracer taskTracer = new TaskTracer(); + private final ByteSizeValue maxHeaderSize; private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); private final SetOnce cancellationService = new SetOnce<>(); @@ -141,6 +143,7 @@ public Task register(String type, String action, TaskAwareRequest request) { } else { Task previousTask = tasks.put(task.getId(), task); assert previousTask == null; + taskTracer.onTaskRegistered(task); } return task; } @@ -193,6 +196,7 @@ private void registerCancellableTask(Task task) { CancellableTask cancellableTask = (CancellableTask) task; CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask); cancellableTasks.put(task, holder); + taskTracer.onTaskRegistered(task); // Check if this task was banned before we start it. The empty check is used to avoid // computing the hash code of the parent taskId as most of the time bannedParents is empty. if (task.getParentTaskId().isSet() && bannedParents.isEmpty() == false) { @@ -231,19 +235,23 @@ public void cancel(CancellableTask task, String reason, Runnable listener) { */ public Task unregister(Task task) { logger.trace("unregister task for id: {}", task.getId()); - if (task instanceof CancellableTask) { - CancellableTaskHolder holder = cancellableTasks.remove(task); - if (holder != null) { - holder.finish(); - assert holder.task == task; - return holder.getTask(); + try { + if (task instanceof CancellableTask) { + CancellableTaskHolder holder = cancellableTasks.remove(task); + if (holder != null) { + holder.finish(); + assert holder.task == task; + return holder.getTask(); + } else { + return null; + } } else { - return null; + final Task removedTask = tasks.remove(task.getId()); + assert removedTask == null || removedTask == task; + return removedTask; } - } else { - final Task removedTask = tasks.remove(task.getId()); - assert removedTask == null || removedTask == task; - return removedTask; + } finally { + taskTracer.onTaskUnregistered(task); } } @@ -730,4 +738,8 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea throw new IllegalStateException("TaskCancellationService is not initialized"); } } + + public TaskTracer getTaskTracer() { + return taskTracer; + } } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskTracer.java b/server/src/main/java/org/elasticsearch/tasks/TaskTracer.java new file mode 100644 index 0000000000000..46b55d62dfccc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/tasks/TaskTracer.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.tasks; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.plugins.TracingPlugin; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class TaskTracer { + + private static final Logger logger = LogManager.getLogger(); + + private final List tracers = new CopyOnWriteArrayList<>(); + + public void addTracer(TracingPlugin.Tracer tracer) { + if (tracer != null) { + tracers.add(tracer); + } + } + + public void onTaskRegistered(Task task) { + for (TracingPlugin.Tracer tracer : tracers) { + try { + tracer.onTaskRegistered(task); + } catch (Exception e) { + assert false : e; + logger.warn( + new ParameterizedMessage( + "task tracing listener [{}] failed on registration of task [{}][{}]", + tracer, + task.getId(), + task.getAction() + ), + e + ); + } + } + } + + public void onTaskUnregistered(Task task) { + for (TracingPlugin.Tracer tracer : tracers) { + try { + tracer.onTaskUnregistered(task); + } catch (Exception e) { + assert false : e; + logger.warn( + new ParameterizedMessage( + "task tracing listener [{}] failed on unregistration of task [{}][{}]", + tracer, + task.getId(), + task.getAction() + ), + e + ); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 8ae4c9dee8a9b..dd7a1498e43e7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -160,7 +160,6 @@ import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.plugins.TracingPlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -1974,8 +1973,7 @@ protected void assertSnapshotOrGenericThread() { actionFilters, indexNameExpressionResolver, namedWriteableRegistry, - EmptySystemIndices.INSTANCE.getExecutorSelector(), - TracingPlugin.NO_TRACING + EmptySystemIndices.INSTANCE.getExecutorSelector() ) ); actions.put( diff --git a/x-pack/plugin/apm-integration/src/internalClusterTest/java/org/elasticsearch/xpack/apm/ApmIT.java b/x-pack/plugin/apm-integration/src/internalClusterTest/java/org/elasticsearch/xpack/apm/ApmIT.java index fcf00203daea9..070b05a6a75c3 100644 --- a/x-pack/plugin/apm-integration/src/internalClusterTest/java/org/elasticsearch/xpack/apm/ApmIT.java +++ b/x-pack/plugin/apm-integration/src/internalClusterTest/java/org/elasticsearch/xpack/apm/ApmIT.java @@ -7,17 +7,24 @@ package org.elasticsearch.xpack.apm; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.trace.data.SpanData; + import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.TracingPlugin; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskTracer; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; import java.util.Collection; +import java.util.Collections; import java.util.List; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; public class ApmIT extends ESIntegTestCase { @@ -31,9 +38,24 @@ public void testModule() { List plugins = internalCluster().getMasterNodeInstance(PluginsService.class).filterPlugins(TracingPlugin.class); assertThat(plugins, hasSize(1)); - TracingPlugin.Tracer tracer = internalCluster().getInstance(TracingPlugin.Tracer.class); - assertThat(tracer, notNullValue()); - assertThat(tracer, instanceOf(APMTracer.class)); - tracer.trace("Hello World!"); + TransportService transportService = internalCluster().getInstance(TransportService.class); + final TaskTracer taskTracer = transportService.getTaskManager().getTaskTracer(); + assertThat(taskTracer, notNullValue()); + + final Task testTask = new Task(randomNonNegativeLong(), "test", "action", "", TaskId.EMPTY_TASK_ID, Collections.emptyMap()); + + taskTracer.onTaskRegistered(testTask); + taskTracer.onTaskUnregistered(testTask); + + final List capturedSpans = APMTracer.CAPTURING_SPAN_EXPORTER.getCapturedSpans(); + boolean found = false; + final Long targetId = testTask.getId(); + for (SpanData capturedSpan : capturedSpans) { + if (targetId.equals(capturedSpan.getAttributes().get(AttributeKey.longKey("es.task.id")))) { + found = true; + assertTrue(capturedSpan.hasEnded()); + } + } + assertTrue(found); } } diff --git a/x-pack/plugin/apm-integration/src/main/java/org/elasticsearch/xpack/apm/APMTracer.java b/x-pack/plugin/apm-integration/src/main/java/org/elasticsearch/xpack/apm/APMTracer.java index ae6ea2961b37c..0a055401dcba3 100644 --- a/x-pack/plugin/apm-integration/src/main/java/org/elasticsearch/xpack/apm/APMTracer.java +++ b/x-pack/plugin/apm-integration/src/main/java/org/elasticsearch/xpack/apm/APMTracer.java @@ -12,35 +12,43 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.exporter.logging.LoggingSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.plugins.TracingPlugin; +import org.elasticsearch.tasks.Task; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; public class APMTracer extends AbstractLifecycleComponent implements TracingPlugin.Tracer { - private static final Logger logger = LogManager.getLogger(APMTracer.class); + public static final CapturingSpanExporter CAPTURING_SPAN_EXPORTER = new CapturingSpanExporter(); - private volatile Tracer tracer; + private final Map taskSpans = ConcurrentCollections.newConcurrentMap(); - public APMTracer() {} + private volatile Tracer tracer; @Override protected void doStart() { SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() - .addSpanProcessor(SimpleSpanProcessor.create(new LoggingSpanExporter())) + .addSpanProcessor(SimpleSpanProcessor.create(CAPTURING_SPAN_EXPORTER)) .build(); OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider(sdkTracerProvider) .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) .build(); + tracer = openTelemetry.getTracer("elasticsearch", Version.CURRENT.toString()); tracer.spanBuilder("startup").startSpan().end(); } @@ -52,12 +60,51 @@ protected void doStop() {} protected void doClose() {} @Override - public void trace(String something) { + public void onTaskRegistered(Task task) { final Tracer tracer = this.tracer; - if (tracer == null) { - return; + if (tracer != null) { + taskSpans.computeIfAbsent(task.getId(), taskId -> { + final Span span = tracer.spanBuilder(task.getAction()).startSpan(); + span.setAttribute("es.task.id", task.getId()); + return span; + }); + } + } + + @Override + public void onTaskUnregistered(Task task) { + final Span span = taskSpans.remove(task.getId()); + if (span != null) { + span.end(); + } + } + + public static class CapturingSpanExporter implements SpanExporter { + + private List capturedSpans = new ArrayList<>(); + + public void clear() { + capturedSpans.clear(); + } + + public List getCapturedSpans() { + return List.copyOf(capturedSpans); + } + + @Override + public CompletableResultCode export(Collection spans) { + capturedSpans.addAll(spans); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); } - final Span span = tracer.spanBuilder("something").startSpan(); - span.end(); } }