Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate tracer with task manager #80721

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.TracingPlugin;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -77,7 +76,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -131,7 +129,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final CircuitBreaker circuitBreaker;
private final ExecutorSelector executorSelector;
private final int defaultPreFilterShardSize;
private final TracingPlugin.Tracer tracer;

@Inject
public TransportSearchAction(
Expand All @@ -145,8 +142,7 @@ public TransportSearchAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
ExecutorSelector executorSelector,
TracingPlugin.Tracer tracer
ExecutorSelector executorSelector
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.threadPool = threadPool;
Expand All @@ -161,7 +157,6 @@ public TransportSearchAction(
this.namedWriteableRegistry = namedWriteableRegistry;
this.executorSelector = executorSelector;
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(clusterService.getSettings());
this.tracer = Objects.requireNonNull(tracer);
}

private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
Expand Down
13 changes: 6 additions & 7 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancellationService;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.tasks.TaskTracer;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -208,7 +209,6 @@

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.core.Types.forciblyCast;
import static org.elasticsearch.plugins.TracingPlugin.NO_TRACING;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -728,11 +728,6 @@ protected Node(
}
new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);

final TracingPlugin.Tracer tracer = (TracingPlugin.Tracer) pluginComponents.stream()
.filter(c -> c instanceof TracingPlugin.Tracer)
.findFirst()
.orElse(NO_TRACING);

final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Expand All @@ -757,6 +752,11 @@ protected Node(
final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);

final TaskTracer taskTracer = transportService.getTaskManager().getTaskTracer();
pluginComponents.stream()
.map(c -> c instanceof TracingPlugin.Tracer ? (TracingPlugin.Tracer) c : null)
.forEach(taskTracer::addTracer);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down Expand Up @@ -974,7 +974,6 @@ protected Node(
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
b.bind(ExecutorSelector.class).toInstance(executorSelector);
b.bind(TracingPlugin.Tracer.class).toInstance(tracer);
});
injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.elasticsearch.plugins;

public interface TracingPlugin {
import org.elasticsearch.tasks.Task;

Tracer NO_TRACING = something -> {};
public interface TracingPlugin {

interface Tracer {
void trace(String something);
void onTaskRegistered(Task task);

void onTaskUnregistered(Task task);
}
}
34 changes: 23 additions & 11 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class TaskManager implements ClusterStateApplier {

private DiscoveryNodes lastDiscoveryNodes = DiscoveryNodes.EMPTY_NODES;

private final TaskTracer taskTracer = new TaskTracer();

private final ByteSizeValue maxHeaderSize;
private final Map<TcpChannel, ChannelPendingTaskTracker> channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap();
private final SetOnce<TaskCancellationService> cancellationService = new SetOnce<>();
Expand Down Expand Up @@ -141,6 +143,7 @@ public Task register(String type, String action, TaskAwareRequest request) {
} else {
Task previousTask = tasks.put(task.getId(), task);
assert previousTask == null;
taskTracer.onTaskRegistered(task);
}
return task;
}
Expand Down Expand Up @@ -193,6 +196,7 @@ private void registerCancellableTask(Task task) {
CancellableTask cancellableTask = (CancellableTask) task;
CancellableTaskHolder holder = new CancellableTaskHolder(cancellableTask);
cancellableTasks.put(task, holder);
taskTracer.onTaskRegistered(task);
// Check if this task was banned before we start it. The empty check is used to avoid
// computing the hash code of the parent taskId as most of the time bannedParents is empty.
if (task.getParentTaskId().isSet() && bannedParents.isEmpty() == false) {
Expand Down Expand Up @@ -231,19 +235,23 @@ public void cancel(CancellableTask task, String reason, Runnable listener) {
*/
public Task unregister(Task task) {
logger.trace("unregister task for id: {}", task.getId());
if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task);
if (holder != null) {
holder.finish();
assert holder.task == task;
return holder.getTask();
try {
if (task instanceof CancellableTask) {
CancellableTaskHolder holder = cancellableTasks.remove(task);
if (holder != null) {
holder.finish();
assert holder.task == task;
return holder.getTask();
} else {
return null;
}
} else {
return null;
final Task removedTask = tasks.remove(task.getId());
assert removedTask == null || removedTask == task;
return removedTask;
}
} else {
final Task removedTask = tasks.remove(task.getId());
assert removedTask == null || removedTask == task;
return removedTask;
} finally {
taskTracer.onTaskUnregistered(task);
}
}

Expand Down Expand Up @@ -730,4 +738,8 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea
throw new IllegalStateException("TaskCancellationService is not initialized");
}
}

public TaskTracer getTaskTracer() {
return taskTracer;
}
}
68 changes: 68 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskTracer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.tasks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.plugins.TracingPlugin;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class TaskTracer {

private static final Logger logger = LogManager.getLogger();

private final List<TracingPlugin.Tracer> tracers = new CopyOnWriteArrayList<>();

public void addTracer(TracingPlugin.Tracer tracer) {
if (tracer != null) {
tracers.add(tracer);
}
}

public void onTaskRegistered(Task task) {
for (TracingPlugin.Tracer tracer : tracers) {
try {
tracer.onTaskRegistered(task);
} catch (Exception e) {
assert false : e;
logger.warn(
new ParameterizedMessage(
"task tracing listener [{}] failed on registration of task [{}][{}]",
tracer,
task.getId(),
task.getAction()
),
e
);
}
}
}

public void onTaskUnregistered(Task task) {
for (TracingPlugin.Tracer tracer : tracers) {
try {
tracer.onTaskUnregistered(task);
} catch (Exception e) {
assert false : e;
logger.warn(
new ParameterizedMessage(
"task tracing listener [{}] failed on unregistration of task [{}][{}]",
tracer,
task.getId(),
task.getAction()
),
e
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.TracingPlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -1974,8 +1973,7 @@ protected void assertSnapshotOrGenericThread() {
actionFilters,
indexNameExpressionResolver,
namedWriteableRegistry,
EmptySystemIndices.INSTANCE.getExecutorSelector(),
TracingPlugin.NO_TRACING
EmptySystemIndices.INSTANCE.getExecutorSelector()
)
);
actions.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@

package org.elasticsearch.xpack.apm;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.trace.data.SpanData;

import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.TracingPlugin;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskTracer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;

public class ApmIT extends ESIntegTestCase {
Expand All @@ -31,9 +38,24 @@ public void testModule() {
List<TracingPlugin> plugins = internalCluster().getMasterNodeInstance(PluginsService.class).filterPlugins(TracingPlugin.class);
assertThat(plugins, hasSize(1));

TracingPlugin.Tracer tracer = internalCluster().getInstance(TracingPlugin.Tracer.class);
assertThat(tracer, notNullValue());
assertThat(tracer, instanceOf(APMTracer.class));
tracer.trace("Hello World!");
TransportService transportService = internalCluster().getInstance(TransportService.class);
final TaskTracer taskTracer = transportService.getTaskManager().getTaskTracer();
assertThat(taskTracer, notNullValue());

final Task testTask = new Task(randomNonNegativeLong(), "test", "action", "", TaskId.EMPTY_TASK_ID, Collections.emptyMap());

taskTracer.onTaskRegistered(testTask);
taskTracer.onTaskUnregistered(testTask);

final List<SpanData> capturedSpans = APMTracer.CAPTURING_SPAN_EXPORTER.getCapturedSpans();
boolean found = false;
final Long targetId = testTask.getId();
for (SpanData capturedSpan : capturedSpans) {
if (targetId.equals(capturedSpan.getAttributes().get(AttributeKey.longKey("es.task.id")))) {
found = true;
assertTrue(capturedSpan.hasEnded());
}
}
assertTrue(found);
}
}
Loading