diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index f18bbb8a1cc13..193e4139b8f67 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -220,6 +220,7 @@ public final void start() { null ) ); + searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext); return; } executePhase(this); @@ -439,18 +440,18 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) { this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext); } - private void onPhaseStart(SearchPhase phase) { + private void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) { setCurrentPhase(phase); - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext); } private void onRequestEnd(SearchRequestContext searchRequestContext) { - this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext); + this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext); } private void executePhase(SearchPhase phase) { try { - onPhaseStart(phase); + onPhaseStart(phase, searchRequestContext); phase.recordAndRun(); } catch (Exception e) { if (logger.isDebugEnabled()) { @@ -703,6 +704,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At searchContextId = null; } } + searchRequestContext.setSearchTask(getTask()); searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits()); searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length); onPhaseEnd(searchRequestContext); @@ -714,7 +716,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext); raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 674363600b251..ae18255df29b9 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; +import org.opensearch.telemetry.tracing.Span; import java.util.EnumMap; import java.util.HashMap; @@ -25,26 +26,50 @@ */ @InternalApi class SearchRequestContext { + private final SearchRequest searchRequest; + private SearchTask searchTask; private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; private TotalHits totalHits; private final EnumMap shardStats; + private Span requestSpan; + private Span phaseSpan; /** * This constructor is for testing only */ SearchRequestContext() { - this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); + this(new SearchRequest()); } - SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + /** + * This constructor is for testing only + */ + SearchRequestContext(SearchRequest searchRequest) { + this(searchRequest, new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); + } + + SearchRequestContext(SearchRequest searchRequest, SearchRequestOperationsListener searchRequestOperationsListener) { + this.searchRequest = searchRequest; this.searchRequestOperationsListener = searchRequestOperationsListener; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); } + public SearchRequest getSearchRequest() { + return searchRequest; + } + + public void setSearchTask(SearchTask searchTask) { + this.searchTask = searchTask; + } + + public SearchTask getSearchTask() { + return searchTask; + } + SearchRequestOperationsListener getSearchRequestOperationsListener() { return searchRequestOperationsListener; } @@ -76,7 +101,7 @@ void setTotalHits(TotalHits totalHits) { this.totalHits = totalHits; } - TotalHits totalHits() { + public TotalHits totalHits() { return totalHits; } @@ -87,7 +112,7 @@ void setShardStats(int total, int successful, int skipped, int failed) { this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed); } - String formattedShardStats() { + public String formattedShardStats() { if (shardStats.isEmpty()) { return ""; } else { @@ -105,6 +130,22 @@ String formattedShardStats() { ); } } + + public void setRequestSpan(Span requestSpan) { + this.requestSpan = requestSpan; + } + + public Span getRequestSpan() { + return requestSpan; + } + + public void setPhaseSpan(Span phaseSpan) { + this.phaseSpan = phaseSpan; + } + + public Span getPhaseSpan() { + return phaseSpan; + } } enum ShardStatsFieldNames { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestCoordinatorTrace.java b/server/src/main/java/org/opensearch/action/search/SearchRequestCoordinatorTrace.java new file mode 100644 index 0000000000000..22b71b6c622f2 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestCoordinatorTrace.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.telemetry.tracing.AttributeNames; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanContext; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; + +import static org.opensearch.core.common.Strings.capitalize; + +/** + * Listener for search request tracing on the coordinator node + * + * @opensearch.internal + */ +public final class SearchRequestCoordinatorTrace extends SearchRequestOperationsListener { + private final Tracer tracer; + + public SearchRequestCoordinatorTrace(Tracer tracer) { + this.tracer = tracer; + } + + @Override + void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + if (searchRequestContext.getPhaseSpan() != null) { + searchRequestContext.getPhaseSpan().endSpan(); + } + searchRequestContext.setPhaseSpan( + tracer.startSpan( + SpanBuilder.from( + "coordinator" + capitalize(context.getCurrentPhase().getName()), + new SpanContext(searchRequestContext.getRequestSpan()) + ) + ) + ); + SpanScope spanScope = tracer.withSpanInScope(searchRequestContext.getPhaseSpan()); + } + + @Override + void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + searchRequestContext.getPhaseSpan().endSpan(); + searchRequestContext.setPhaseSpan(null); + } + + @Override + void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + searchRequestContext.getPhaseSpan().endSpan(); + searchRequestContext.setPhaseSpan(null); + } + + @Override + void onRequestEnd(SearchRequestContext searchRequestContext) { + Span requestSpan = searchRequestContext.getRequestSpan(); + + // add response-related attributes on request end + requestSpan.addAttribute( + AttributeNames.TOTAL_HITS, + searchRequestContext.totalHits() == null ? "0" : searchRequestContext.totalHits().toString() + ); + requestSpan.addAttribute( + AttributeNames.SHARDS, + searchRequestContext.formattedShardStats().isEmpty() ? "no data" : searchRequestContext.formattedShardStats() + ); + requestSpan.addAttribute( + AttributeNames.SOURCE, + searchRequestContext.getSearchRequest().source() == null + ? "no source" + : searchRequestContext.getSearchRequest().source().toString() + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 19ce0beb3c493..ff054e568b69a 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -22,15 +22,15 @@ @InternalApi abstract class SearchRequestOperationsListener { - abstract void onPhaseStart(SearchPhaseContext context); + abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext); abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - abstract void onPhaseFailure(SearchPhaseContext context); + abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext); void onRequestStart(SearchRequestContext searchRequestContext) {} - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + void onRequestEnd(SearchRequestContext searchRequestContext) {} /** * Holder of Composite Listeners @@ -48,10 +48,10 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - void onPhaseStart(SearchPhaseContext context) { + void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseStart(context); + listener.onPhaseStart(context, searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); } @@ -70,10 +70,10 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseFailure(context); + listener.onPhaseFailure(context, searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); } @@ -92,10 +92,10 @@ void onRequestStart(SearchRequestContext searchRequestContext) { } @Override - public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + public void onRequestEnd(SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onRequestEnd(context, searchRequestContext); + listener.onRequestEnd(searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index a55cfd463a78f..3d084690c9993 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,29 +134,29 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - void onPhaseStart(SearchPhaseContext context) {} + void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + void onRequestEnd(SearchRequestContext searchRequestContext) { long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { - logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + logger.warn(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext)); } else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) { - logger.info(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + logger.info(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext)); } else if (debugThreshold >= 0 && tookInNanos > debugThreshold && level.isLevelEnabledFor(SlowLogLevel.DEBUG)) { - logger.debug(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + logger.debug(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext)); } else if (traceThreshold >= 0 && tookInNanos > traceThreshold && level.isLevelEnabledFor(SlowLogLevel.TRACE)) { - logger.trace(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + logger.trace(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext)); } } @@ -167,15 +167,11 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest */ static final class SearchRequestSlowLogMessage extends OpenSearchLogMessage { - SearchRequestSlowLogMessage(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) { - super(prepareMap(context, tookInNanos, searchRequestContext), message(context, tookInNanos, searchRequestContext)); + SearchRequestSlowLogMessage(long tookInNanos, SearchRequestContext searchRequestContext) { + super(prepareMap(tookInNanos, searchRequestContext), message(tookInNanos, searchRequestContext)); } - private static Map prepareMap( - SearchPhaseContext context, - long tookInNanos, - SearchRequestContext searchRequestContext - ) { + private static Map prepareMap(long tookInNanos, SearchRequestContext searchRequestContext) { final Map messageFields = new HashMap<>(); messageFields.put("took", TimeValue.timeValueNanos(tookInNanos)); messageFields.put("took_millis", TimeUnit.NANOSECONDS.toMillis(tookInNanos)); @@ -185,22 +181,24 @@ private static Map prepareMap( } else { messageFields.put("total_hits", "-1"); } - messageFields.put("search_type", context.getRequest().searchType()); + messageFields.put("search_type", searchRequestContext.getSearchRequest().searchType()); messageFields.put("shards", searchRequestContext.formattedShardStats()); - - if (context.getRequest().source() != null) { - String source = escapeJson(context.getRequest().source().toString(FORMAT_PARAMS)); + if (searchRequestContext.getSearchRequest().source() != null) { + String source = escapeJson(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS)); messageFields.put("source", source); } else { messageFields.put("source", "{}"); } - - messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID)); + if (searchRequestContext.getSearchTask() != null) { + messageFields.put("id", searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID)); + } else { + messageFields.put("id", ""); + } return messageFields; } // Message will be used in plaintext logs - private static String message(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) { + private static String message(long tookInNanos, SearchRequestContext searchRequestContext) { final StringBuilder sb = new StringBuilder(); sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], "); sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], "); @@ -210,15 +208,15 @@ private static String message(SearchPhaseContext context, long tookInNanos, Sear } else { sb.append("total_hits[-1]"); } - sb.append("search_type[").append(context.getRequest().searchType()).append("], "); + sb.append("search_type[").append(searchRequestContext.getSearchRequest().searchType()).append("], "); sb.append("shards[").append(searchRequestContext.formattedShardStats()).append("], "); - if (context.getRequest().source() != null) { - sb.append("source[").append(context.getRequest().source().toString(FORMAT_PARAMS)).append("], "); + if (searchRequestContext.getSearchRequest().source() != null) { + sb.append("source[").append(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS)).append("], "); } else { sb.append("source[], "); } - if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) { - sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("]"); + if (searchRequestContext.getSearchTask() != null) { + sb.append("id[").append(searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID)).append("]"); } else { sb.append("id[]"); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 262750849eaa9..954cd66e0f45f 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -46,7 +46,7 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - void onPhaseStart(SearchPhaseContext context) { + void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @@ -59,7 +59,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 05f4308df74fa..56bbeadad157b 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -60,6 +60,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.CountDown; import org.opensearch.core.action.ActionListener; @@ -88,6 +89,10 @@ import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; @@ -119,6 +124,7 @@ import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH; +import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort; /** @@ -181,6 +187,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -232,6 +240,7 @@ public TransportSearchAction( this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); + this.tracer = tracer; } private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { @@ -352,7 +361,7 @@ SearchResponse.PhaseTook getPhaseTook() { Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); @Override - void onPhaseStart(SearchPhaseContext context) {} + void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { @@ -363,7 +372,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) {} + void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} public Long getPhaseTookTime(SearchPhaseName searchPhaseName) { return phaseStatsMap.get(searchPhaseName); @@ -493,12 +502,21 @@ private void executeRequest( final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider); SearchRequestContext searchRequestContext = new SearchRequestContext( + originalSearchRequest, new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) ); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; ActionListener listener; + Span requestSpan; + + if (FeatureFlags.isEnabled(TELEMETRY)) { + requestSpan = tracer.startSpan(SpanBuilder.from("coordinatorRequest")); + originalListener = TraceableActionListener.create(originalListener, requestSpan, tracer); + searchRequestContext.setRequestSpan(requestSpan); + } + try { searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); listener = searchRequest.transformResponseListener(originalListener); @@ -1270,6 +1288,10 @@ private List createSearchListenerList(SearchReq searchListenersList.add(searchRequestSlowLog); } + if (FeatureFlags.isEnabled(TELEMETRY)) { + searchListenersList.add(new SearchRequestCoordinatorTrace(tracer)); + } + return searchListenersList; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4cbf8dc191a9d..6c09d3c32f286 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestCoordinatorTrace; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; @@ -785,6 +786,7 @@ protected Node( final SearchRequestStats searchRequestStats = new SearchRequestStats(); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + final SearchRequestCoordinatorTrace searchRequestCoordinatorTrace = new SearchRequestCoordinatorTrace(tracer); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( @@ -1271,6 +1273,7 @@ protected Node( b.bind(Tracer.class).toInstance(tracer); b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); + b.bind(SearchRequestCoordinatorTrace.class).toInstance(searchRequestCoordinatorTrace); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index b6b2cf360d1c5..024b74ab6d2d0 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -94,4 +94,19 @@ private AttributeNames() { * Refresh Policy */ public static final String REFRESH_POLICY = "refresh_policy"; + + /** + * Search Request Source + */ + public static final String SOURCE = "source"; + + /** + * Search Response Shard Stats + */ + public static final String SHARDS = "shards"; + + /** + * Search Response Total Hits + */ + public static final String TOTAL_HITS = "total_hits"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 1dce422943b7a..5cca1c4f21c46 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -14,6 +14,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; +import org.opensearch.search.internal.SearchContext; import org.opensearch.telemetry.tracing.attributes.Attributes; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; @@ -42,6 +43,15 @@ private SpanBuilder() { } + /** + * Creates {@link SpanCreationContext} from String + * @param spanName String. + * @return context. + */ + public static SpanCreationContext from(String spanName) { + return SpanCreationContext.server().name(spanName); + } + /** * Creates {@link SpanCreationContext} from the {@link HttpRequest} * @param request Http request. @@ -170,4 +180,31 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ return attributes; } + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param spanName name of span. + * @param parentSpan target parent span. + * @return context + */ + public static SpanCreationContext from(String spanName, SpanContext parentSpan) { + return SpanCreationContext.server().name(spanName).parent(parentSpan); + } + + /** + * Creates {@link SpanCreationContext} from SearchRequest. + * @param spanName name of span. + * @param searchContext SearchRequest object. + * @return context + */ + public static SpanCreationContext from(String spanName, SearchContext searchContext) { + return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(searchContext)); + } + + private static Attributes buildSpanAttributes(SearchContext searchContext) { + Attributes attributes = Attributes.create() + .addAttribute(AttributeNames.SHARD_ID, searchContext.request().shardId().getId()) + .addAttribute(AttributeNames.INDEX, searchContext.request().shardId().getIndexName()); + return attributes; + } + } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index e17fbab32a12e..2444a5ae1b6e0 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -710,7 +710,10 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequest(), + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ) ); } @@ -760,7 +763,10 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequest(), + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ) ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java index 58a4c4a4e555d..467c6a5cc8a63 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java @@ -13,7 +13,7 @@ */ public interface SearchRequestOperationsListenerSupport { default void onPhaseStart(SearchRequestOperationsListener listener, SearchPhaseContext context) { - listener.onPhaseStart(context); + listener.onPhaseStart(context, new SearchRequestContext()); } default void onPhaseEnd(SearchRequestOperationsListener listener, SearchPhaseContext context) { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index a53c35a8401b3..b825fa43039bf 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -29,7 +29,7 @@ public void testListenersAreExecuted() { SearchRequestOperationsListener testListener = new SearchRequestOperationsListener() { @Override - public void onPhaseStart(SearchPhaseContext context) { + public void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @@ -40,7 +40,7 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe } @Override - public void onPhaseFailure(SearchPhaseContext context) { + public void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } }; @@ -62,7 +62,7 @@ public void onPhaseFailure(SearchPhaseContext context) { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(ctx.getCurrentPhase()).thenReturn(searchPhase); when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - compositeListener.onPhaseStart(ctx); + compositeListener.onPhaseStart(ctx, new SearchRequestContext()); assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count()); } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index e23f08c9415eb..c1a4b8da9d682 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -113,7 +113,6 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { public void testOnRequestEnd() throws InterruptedException { final Logger logger = mock(Logger.class); final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); - final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); final SearchRequest searchRequest = mock(SearchRequest.class); final SearchTask searchTask = mock(SearchTask.class); @@ -131,11 +130,11 @@ public void testOnRequestEnd() throws InterruptedException { new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) ); when(searchRequestContext.getAbsoluteStartNanos()).thenReturn(System.nanoTime() - 1L); - when(searchPhaseContext.getRequest()).thenReturn(searchRequest); - when(searchPhaseContext.getTask()).thenReturn(searchTask); + when(searchRequestContext.getSearchRequest()).thenReturn(searchRequest); + when(searchRequestContext.getSearchTask()).thenReturn(searchTask); when(searchRequest.searchType()).thenReturn(SearchType.QUERY_THEN_FETCH); - searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchPhaseContext, searchRequestContext); + searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext); verify(logger, never()).warn(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); verify(logger, times(1)).info(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); @@ -145,7 +144,6 @@ public void testOnRequestEnd() throws InterruptedException { public void testConcurrentOnRequestEnd() throws InterruptedException { final Logger logger = mock(Logger.class); - final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); final SearchRequest searchRequest = mock(SearchRequest.class); final SearchTask searchTask = mock(SearchTask.class); @@ -160,8 +158,6 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); - when(searchPhaseContext.getRequest()).thenReturn(searchRequest); - when(searchPhaseContext.getTask()).thenReturn(searchTask); when(searchRequest.searchType()).thenReturn(SearchType.QUERY_THEN_FETCH); int numRequests = 50; @@ -175,8 +171,10 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { ArrayList searchRequestContexts = new ArrayList<>(); for (int i = 0; i < numRequests; i++) { SearchRequestContext searchRequestContext = new SearchRequestContext( + searchRequest, new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) ); + searchRequestContext.setSearchTask(searchTask); searchRequestContext.setAbsoluteStartNanos((i < numRequestsLogged) ? 0 : System.nanoTime()); searchRequestContexts.add(searchRequestContext); } @@ -186,7 +184,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); SearchRequestContext thisContext = searchRequestContexts.get(finalI); - thisContext.getSearchRequestOperationsListener().onRequestEnd(searchPhaseContext, thisContext); + thisContext.getSearchRequestOperationsListener().onRequestEnd(thisContext); countDownLatch.countDown(); }); threads[i].start(); @@ -203,13 +201,8 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { public void testSearchRequestSlowLogHasJsonFields_EmptySearchRequestContext() throws IOException { SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); - SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); - SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( - searchPhaseContext, - 10, - searchRequestContext - ); + SearchRequestContext searchRequestContext = new SearchRequestContext(searchRequest); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage(10, searchRequestContext); assertThat(p.getValueFor("took"), equalTo("10nanos")); assertThat(p.getValueFor("took_millis"), equalTo("0")); @@ -218,24 +211,19 @@ public void testSearchRequestSlowLogHasJsonFields_EmptySearchRequestContext() th assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); assertThat(p.getValueFor("shards"), equalTo("")); assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); - assertThat(p.getValueFor("id"), equalTo(null)); + assertThat(p.getValueFor("id"), equalTo("")); } public void testSearchRequestSlowLogHasJsonFields_NotEmptySearchRequestContext() throws IOException { SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); - SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext(searchRequest); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); searchRequestContext.setShardStats(10, 8, 1, 1); - SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( - searchPhaseContext, - 10, - searchRequestContext - ); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage(10, searchRequestContext); assertThat(p.getValueFor("took"), equalTo("10nanos")); assertThat(p.getValueFor("took_millis"), equalTo("0")); @@ -244,21 +232,19 @@ public void testSearchRequestSlowLogHasJsonFields_NotEmptySearchRequestContext() assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); assertThat(p.getValueFor("shards"), equalTo("{total:10, successful:8, skipped:1, failed:1}")); assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); - assertThat(p.getValueFor("id"), equalTo(null)); + assertThat(p.getValueFor("id"), equalTo("")); } public void testSearchRequestSlowLogHasJsonFields_PartialContext() throws IOException { SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); - SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext(searchRequest); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); searchRequestContext.setShardStats(5, 3, 1, 1); SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( - searchPhaseContext, 10000000000L, searchRequestContext ); @@ -270,21 +256,19 @@ public void testSearchRequestSlowLogHasJsonFields_PartialContext() throws IOExce assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); assertThat(p.getValueFor("shards"), equalTo("{total:5, successful:3, skipped:1, failed:1}")); assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); - assertThat(p.getValueFor("id"), equalTo(null)); + assertThat(p.getValueFor("id"), equalTo("")); } public void testSearchRequestSlowLogSearchContextPrinterToLog() throws IOException { SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); - SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext(searchRequest); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); searchRequestContext.setShardStats(10, 8, 1, 1); SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( - searchPhaseContext, 100000, searchRequestContext ); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 93cf77933fdd5..e15669be57dc2 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -29,9 +29,9 @@ public void testSearchRequestPhaseFailure() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart(ctx, new SearchRequestContext()); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure(ctx, new SearchRequestContext()); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); } } @@ -46,7 +46,7 @@ public void testSearchRequestStats() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); long tookTimeInMillis = randomIntBetween(1, 10); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart(ctx, new SearchRequestContext()); long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); @@ -71,7 +71,7 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseStart(ctx, new SearchRequestContext()); countDownLatch.countDown(); }); threads[i].start(); @@ -134,8 +134,8 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseStart(ctx, new SearchRequestContext()); + testRequestStats.onPhaseFailure(ctx, new SearchRequestContext()); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java index 4d8a44417a3ee..bb8bbb2266ca7 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java @@ -26,9 +26,9 @@ public void testSearchTimeProviderPhaseFailure() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); - testTimeProvider.onPhaseStart(ctx); + testTimeProvider.onPhaseStart(ctx, new SearchRequestContext()); assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseFailure(ctx); + testTimeProvider.onPhaseFailure(ctx, new SearchRequestContext()); assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); } } @@ -43,7 +43,7 @@ public void testSearchTimeProviderPhaseEnd() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); long tookTimeInMillis = randomIntBetween(1, 100); - testTimeProvider.onPhaseStart(ctx); + testTimeProvider.onPhaseStart(ctx, new SearchRequestContext()); long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9fe1f8294fc74..cce051087eafa 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2312,7 +2312,8 @@ public void onFailure(final Exception e) { ), null, new SearchRequestSlowLog(clusterService), - NoopMetricsRegistry.INSTANCE + NoopMetricsRegistry.INSTANCE, + NoopTracer.INSTANCE ) ); actions.put(