Skip to content

Commit

Permalink
Search shard tracing
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Jan 9, 2024
1 parent 4d3794d commit 68f1723
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 38 deletions.
9 changes: 6 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,8 @@ protected Node(
searchModule.getFetchPhase(),
responseCollectorService,
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool)
searchModule.getIndexSearcherExecutor(threadPool),
tracer
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1791,7 +1792,8 @@ protected SearchService newSearchService(
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
Tracer tracer
) {
return new SearchService(
clusterService,
Expand All @@ -1803,7 +1805,8 @@ protected SearchService newSearchService(
fetchPhase,
responseCollectorService,
circuitBreakerService,
indexSearcherExecutor
indexSearcherExecutor,
tracer
);
}

Expand Down
83 changes: 54 additions & 29 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.io.IOUtils;
Expand All @@ -84,6 +85,7 @@
import org.opensearch.index.query.QueryRewriteContext;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.index.search.stats.ShardSearchStats;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -135,6 +137,9 @@
import org.opensearch.search.sort.SortOrder;
import org.opensearch.search.suggest.Suggest;
import org.opensearch.search.suggest.completion.CompletionSuggestion;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
Expand All @@ -158,6 +163,7 @@
import static org.opensearch.common.unit.TimeValue.timeValueHours;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;

/**
* The main search service
Expand Down Expand Up @@ -318,6 +324,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final AtomicInteger openPitContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;
private final Tracer tracer;

public SearchService(
ClusterService clusterService,
Expand All @@ -329,7 +336,8 @@ public SearchService(
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
Tracer tracer
) {
Settings settings = clusterService.getSettings();
this.threadPool = threadPool;
Expand All @@ -346,6 +354,7 @@ public SearchService(
circuitBreakerService.getBreaker(CircuitBreaker.REQUEST)
);
this.indexSearcherExecutor = indexSearcherExecutor;
this.tracer = tracer;
TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
setPitKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_PIT_KEEPALIVE_SETTING.get(settings));
Expand Down Expand Up @@ -606,7 +615,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
SearchContext context = createContext(readerContext, request, task, true)
) {
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer)) {
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
freeReaderContext(readerContext.id());
Expand Down Expand Up @@ -637,7 +646,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
}

private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, tracer, true, afterQueryTime)) {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (reader.singleSession()) {
Expand Down Expand Up @@ -666,7 +675,7 @@ public void executeQueryPhase(
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
) {
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
processScroll(request, readerContext, searchContext);
Expand All @@ -690,7 +699,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
readerContext.setAggregatedDfs(request.dfs());
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
) {
searchContext.searcher().setAggregatedDfs(request.dfs());
queryPhase.execute(searchContext);
Expand Down Expand Up @@ -745,7 +754,7 @@ public void executeFetchPhase(
final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
try (
SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, tracer)
) {
searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
Expand Down Expand Up @@ -776,7 +785,12 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs()));
searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
try (
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime())
SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(
searchContext,
tracer,
true,
System.nanoTime()
)
) {
fetchPhase.execute(searchContext);
if (readerContext.singleSession()) {
Expand Down Expand Up @@ -1168,10 +1182,10 @@ private void checkKeepAliveLimit(long keepAlive) {
if (keepAlive > maxKeepAlive) {
throw new IllegalArgumentException(
"Keep alive for request ("
+ TimeValue.timeValueMillis(keepAlive)
+ timeValueMillis(keepAlive)
+ ") is too large. "
+ "It must be less than ("
+ TimeValue.timeValueMillis(maxKeepAlive)
+ timeValueMillis(maxKeepAlive)
+ "). "
+ "This limit can be set by changing the ["
+ MAX_KEEPALIVE_SETTING.getKey()
Expand All @@ -1187,10 +1201,10 @@ private void checkPitKeepAliveLimit(long keepAlive) {
if (keepAlive > maxPitKeepAlive) {
throw new IllegalArgumentException(
"Keep alive for request ("
+ TimeValue.timeValueMillis(keepAlive)
+ timeValueMillis(keepAlive)
+ ") is too large. "
+ "It must be less than ("
+ TimeValue.timeValueMillis(maxPitKeepAlive)
+ timeValueMillis(maxPitKeepAlive)
+ "). "
+ "This limit can be set by changing the ["
+ MAX_PIT_KEEPALIVE_SETTING.getKey()
Expand Down Expand Up @@ -1352,7 +1366,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
+ "] index level setting."
);
}
for (org.opensearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
for (SearchSourceBuilder.ScriptField field : source.scriptFields()) {
FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
SearchLookup lookup = context.getQueryShardContext().lookup();
FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
Expand Down Expand Up @@ -1657,29 +1671,20 @@ public IndicesService getIndicesService() {
}

/**
* Returns a builder for {@link InternalAggregation.ReduceContext}. This
* Returns a builder for {@link ReduceContext}. This
* builder retains a reference to the provided {@link SearchSourceBuilder}.
*/
public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchSourceBuilder searchSourceBuilder) {
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(
bigArrays,
scriptService,
() -> requestToPipelineTree(searchSourceBuilder)
);
public ReduceContext forPartialReduction() {
return ReduceContext.forPartialReduction(bigArrays, scriptService, () -> requestToPipelineTree(searchSourceBuilder));
}

@Override
public ReduceContext forFinalReduction() {
PipelineTree pipelineTree = requestToPipelineTree(searchSourceBuilder);
return InternalAggregation.ReduceContext.forFinalReduction(
bigArrays,
scriptService,
multiBucketConsumerService.create(),
pipelineTree
);
return ReduceContext.forFinalReduction(bigArrays, scriptService, multiBucketConsumerService.create(), pipelineTree);
}
};
}
Expand Down Expand Up @@ -1728,7 +1733,7 @@ public MinAndMax<?> estimatedMinAndMax() {

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.opensearch.index.search.stats.ShardSearchStats}.
* This is crucial for some implementations like {@link ShardSearchStats}.
*/
private static final class SearchOperationListenerExecutor implements AutoCloseable {
private final SearchOperationListener listener;
Expand All @@ -1737,20 +1742,28 @@ private static final class SearchOperationListenerExecutor implements AutoClosea
private final boolean fetch;
private long afterQueryTime = -1;
private boolean closed = false;
private Span querySpan = null;
private Span fetchSpan = null;

SearchOperationListenerExecutor(SearchContext context) {
this(context, false, System.nanoTime());
SearchOperationListenerExecutor(SearchContext context, Tracer tracer) {
this(context, tracer, false, System.nanoTime());
}

SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
SearchOperationListenerExecutor(SearchContext context, Tracer tracer, boolean fetch, long startTime) {
this.listener = context.indexShard().getSearchOperationListener();
this.context = context;
time = startTime;
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
if (FeatureFlags.isEnabled(TELEMETRY)) {
fetchSpan = tracer.startSpan(SpanBuilder.from("shardFetch", context));
}
} else {
listener.onPreQueryPhase(context);
if (FeatureFlags.isEnabled(TELEMETRY)) {
querySpan = tracer.startSpan(SpanBuilder.from("shardQuery", context));
}
}
}

Expand All @@ -1766,14 +1779,26 @@ public void close() {
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
if (fetchSpan != null) {
fetchSpan.endSpan();
}
} else {
listener.onQueryPhase(context, afterQueryTime - time);
if (querySpan != null) {
querySpan.endSpan();
}
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
if (fetchSpan != null) {
fetchSpan.endSpan();
}
} else {
listener.onFailedQueryPhase(context);
if (querySpan != null) {
querySpan.endSpan();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,8 @@ public void onFailure(final Exception e) {
new FetchPhase(Collections.emptyList()),
responseCollectorService,
new NoneCircuitBreakerService(),
null
null,
NoopTracer.INSTANCE
);
SearchPhaseController searchPhaseController = new SearchPhaseController(
writableRegistry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ protected SearchService newSearchService(
FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
Tracer tracer
) {
if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) {
return super.newSearchService(
Expand All @@ -168,7 +169,8 @@ protected SearchService newSearchService(
fetchPhase,
responseCollectorService,
circuitBreakerService,
indexSearcherExecutor
indexSearcherExecutor,
tracer
);
}
return new MockSearchService(
Expand All @@ -180,7 +182,8 @@ protected SearchService newSearchService(
queryPhase,
fetchPhase,
circuitBreakerService,
indexSearcherExecutor
indexSearcherExecutor,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.internal.ReaderContext;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;

import java.util.HashMap;
Expand Down Expand Up @@ -96,7 +97,8 @@ public MockSearchService(
QueryPhase queryPhase,
FetchPhase fetchPhase,
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor
Executor indexSearcherExecutor,
Tracer tracer
) {
super(
clusterService,
Expand All @@ -108,7 +110,8 @@ public MockSearchService(
fetchPhase,
null,
circuitBreakerService,
indexSearcherExecutor
indexSearcherExecutor,
tracer
);
}

Expand Down

0 comments on commit 68f1723

Please sign in to comment.