Skip to content

Commit

Permalink
Tracing instrumentation at deep search path
Browse files Browse the repository at this point in the history
  • Loading branch information
dzane17 committed Dec 12, 2023
1 parent a8da66c commit 66a2583
Show file tree
Hide file tree
Showing 23 changed files with 301 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
}

private void onPhaseStart(SearchPhase phase) {
private void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) {
setCurrentPhase(phase);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext);
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
Expand All @@ -450,7 +450,7 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase);
onPhaseStart(phase, searchRequestContext);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -714,7 +714,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext);
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.Span;

import java.util.EnumMap;
import java.util.HashMap;
Expand All @@ -24,12 +25,14 @@
* @opensearch.internal
*/
@InternalApi
class SearchRequestContext {
public class SearchRequestContext {
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
private Span requestSpan;
private Span phaseSpan;

/**
* This constructor is for testing only
Expand Down Expand Up @@ -76,7 +79,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand All @@ -87,7 +90,7 @@ void setShardStats(int total, int successful, int skipped, int failed) {
this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed);
}

String formattedShardStats() {
public String formattedShardStats() {
if (shardStats.isEmpty()) {
return "";
} else {
Expand All @@ -105,6 +108,22 @@ String formattedShardStats() {
);
}
}

public void setRequestSpan(Span requestSpan) {
this.requestSpan = requestSpan;
}

public Span getRequestSpan() {
return requestSpan;
}

public void setPhaseSpan(Span phaseSpan) {
this.phaseSpan = phaseSpan;
}

public Span getPhaseSpan() {
return phaseSpan;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.telemetry.tracing.AttributeNames;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;

/**
* Listener for search request tracing on the coordinator node
*
* @opensearch.internal
*/
public final class SearchRequestCoordinatorTrace extends SearchRequestOperationsListener {
private final Tracer tracer;

public SearchRequestCoordinatorTrace(Tracer tracer) {
this.tracer = tracer;
}

@Override
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.setPhaseSpan(
tracer.startSpan(
SpanBuilder.from(
"coordinator" + capitalize(context.getCurrentPhase().getName()),
new SpanContext(searchRequestContext.getRequestSpan())
)
)
);
SpanScope spanScope = tracer.withSpanInScope(searchRequestContext.getPhaseSpan());
}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onRequestStart(SearchRequestContext searchRequestContext) {
searchRequestContext.setRequestSpan(tracer.startSpan(SpanBuilder.from("coordinatorRequest")));
}

@Override
void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
// add response-related attributes on request end
if (searchRequestContext.totalHits() != null) {
searchRequestContext.getRequestSpan().addAttribute(AttributeNames.TOTAL_HITS, searchRequestContext.totalHits().toString());
}
if (!searchRequestContext.formattedShardStats().equals("")) {
searchRequestContext.getRequestSpan().addAttribute(AttributeNames.SHARDS, searchRequestContext.formattedShardStats());
}
searchRequestContext.getRequestSpan().addAttribute(AttributeNames.SOURCE, context.getRequest().source().toString());
searchRequestContext.getRequestSpan().endSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
@InternalApi
abstract class SearchRequestOperationsListener {

abstract void onPhaseStart(SearchPhaseContext context);
abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseFailure(SearchPhaseContext context);
abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext);

void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand All @@ -48,10 +48,10 @@ static final class CompositeListener extends SearchRequestOperationsListener {
}

@Override
void onPhaseStart(SearchPhaseContext context) {
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseStart(context);
listener.onPhaseStart(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e);
}
Expand All @@ -70,10 +70,10 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
}

@Override
void onPhaseFailure(SearchPhaseContext context) {
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ public SearchRequestSlowLog(ClusterService clusterService) {
}

@Override
void onPhaseStart(SearchPhaseContext context) {}
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseFailure(SearchPhaseContext context) {}
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) {
}

@Override
void onPhaseStart(SearchPhaseContext context) {
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
}

Expand All @@ -59,7 +59,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
}

@Override
void onPhaseFailure(SearchPhaseContext context) {
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterAware;
import org.opensearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -119,6 +121,7 @@
import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;
import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;

/**
Expand Down Expand Up @@ -181,6 +184,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final Tracer tracer;

private volatile boolean isRequestStatsEnabled;

Expand Down Expand Up @@ -209,7 +213,8 @@ public TransportSearchAction(
SearchPipelineService searchPipelineService,
SearchRequestStats searchRequestStats,
SearchRequestSlowLog searchRequestSlowLog,
MetricsRegistry metricsRegistry
MetricsRegistry metricsRegistry,
Tracer tracer
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -232,6 +237,7 @@ public TransportSearchAction(
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
this.tracer = tracer;
}

private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
Expand Down Expand Up @@ -352,7 +358,7 @@ SearchResponse.PhaseTook getPhaseTook() {
Map<SearchPhaseName, Long> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

@Override
void onPhaseStart(SearchPhaseContext context) {}
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
Expand All @@ -363,7 +369,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
}

@Override
void onPhaseFailure(SearchPhaseContext context) {}
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

public Long getPhaseTookTime(SearchPhaseName searchPhaseName) {
return phaseStatsMap.get(searchPhaseName);
Expand Down Expand Up @@ -1270,6 +1276,10 @@ private List<SearchRequestOperationsListener> createSearchListenerList(SearchReq
searchListenersList.add(searchRequestSlowLog);
}

if (FeatureFlags.isEnabled(TELEMETRY)) {
searchListenersList.add(new SearchRequestCoordinatorTrace(tracer));
}

return searchListenersList;
}

Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -296,6 +297,7 @@ public Iterator<Setting<?>> settings() {
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final Tracer tracer;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -314,7 +316,8 @@ public IndexModule(
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final Tracer tracer
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -326,6 +329,7 @@ public IndexModule(
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.tracer = tracer;
}

/**
Expand Down Expand Up @@ -664,7 +668,8 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier,
recoverySettings
recoverySettings,
tracer
);
success = true;
return indexService;
Expand Down
Loading

0 comments on commit 66a2583

Please sign in to comment.