diff --git a/CHANGELOG.md b/CHANGELOG.md index 8679735fdefd3..0c7a1bda4e1d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986)) - Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) - [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) +- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java index f9af611553aff..e5e62c795e5d0 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java @@ -31,4 +31,19 @@ public SpanContext(Span span) { Span getSpan() { return span; } + + /** + * Sets the error for the current span behind this context + * @param cause error + */ + public void setError(final Exception cause) { + span.setError(cause); + } + + /** + * Ends current span + */ + public void endSpan() { + span.endSpan(); + } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java index cbbcfe7a85d57..6af7c440f8de9 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java @@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) { } /** - * Sets the parent for spann - * @param parent parent + * Sets the parent for span + * @param parent parent span context * @return spanCreationContext */ public SpanCreationContext parent(SpanContext parent) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 705273f52a567..9ec8673147c38 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {} public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - public void onPhaseFailure(SearchPhaseContext context) {} + public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} @Override public void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 3b99ea9cc1cd2..0520a4a7aecec 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -58,6 +58,10 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.pipeline.PipelinedRequest; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanCreationContext; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.ArrayDeque; @@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; private final SearchRequestContext searchRequestContext; + private final Tracer tracer; private SearchPhase currentPhase; private boolean currentPhaseHasLifecycle; @@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction exten SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters, - SearchRequestContext searchRequestContext + SearchRequestContext searchRequestContext, + Tracer tracer ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction exten this.results = resultConsumer; this.clusters = clusters; this.searchRequestContext = searchRequestContext; + this.tracer = tracer; } @Override @@ -221,6 +228,7 @@ public final void start() { null ) ); + onRequestEnd(searchRequestContext); return; } executePhase(this); @@ -460,7 +468,8 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) { } private void executePhase(SearchPhase phase) { - try { + Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]")); + try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) { onPhaseStart(phase); phase.recordAndRun(); } catch (Exception e) { @@ -468,7 +477,15 @@ private void executePhase(SearchPhase phase) { logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e); } + if (currentPhaseHasLifecycle == false) { + phaseSpan.setError(e); + } + onPhaseFailure(phase, "", e); + } finally { + if (currentPhaseHasLifecycle == false) { + phaseSpan.endSpan(); + } } } @@ -733,7 +750,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { if (currentPhaseHasLifecycle) { - this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause); } raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index c693eea4a2c33..952d83b9e4539 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -44,6 +44,7 @@ import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortOrder; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.Comparator; @@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, SearchResponse.Clusters clusters, - SearchRequestContext searchRequestContext + SearchRequestContext searchRequestContext, + Tracer tracer ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + tracer ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index c26bd5eef8c15..c8ab5fdaf61a1 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -43,6 +43,7 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.transport.Transport; import java.util.Map; @@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 74e04d976cb1c..a9a07c6aca7f4 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {} protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} @Override protected void onRequestStart(SearchRequestContext searchRequestContext) {} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index ac32b08afb7f6..97ef94055faf7 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc } @Override - protected void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 3d1a25a8aa01f..65cfd35489033 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -88,6 +88,12 @@ import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.listener.TraceableActionListener; +import org.opensearch.telemetry.tracing.listener.TraceableSearchRequestOperationsListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterAware; import org.opensearch.transport.RemoteClusterService; @@ -173,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -215,6 +223,7 @@ public TransportSearchAction( this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); + this.tracer = tracer; } private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) { @@ -384,7 +393,8 @@ public AbstractSearchAsyncAction asyncSearchAction( new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + tracer ) { @Override protected void executePhaseOnShard( @@ -431,49 +441,58 @@ private void executeRequest( if (originalSearchRequest.isPhaseTook() == null) { originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); } - SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory - .buildCompositeListener(originalSearchRequest, logger); - SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); - searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); - - PipelinedRequest searchRequest; - ActionListener listener; - try { - searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); - listener = searchRequest.transformResponseListener(originalListener); - } catch (Exception e) { - originalListener.onFailure(e); - return; - } - ActionListener requestTransformListener = ActionListener.wrap(sr -> { - if (searchQueryMetricsEnabled) { - try { - searchQueryCategorizer.categorize(sr.source()); - } catch (Exception e) { - logger.error("Error while trying to categorize the query.", e); - } + final Span requestSpan = tracer.startSpan(SpanBuilder.from(task, actionName)); + try (final SpanScope spanScope = tracer.withSpanInScope(requestSpan)) { + SearchRequestOperationsListener.CompositeListener requestOperationsListeners; + final ActionListener updatedListener = TraceableActionListener.create(originalListener, requestSpan, tracer); + requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener( + originalSearchRequest, + logger, + TraceableSearchRequestOperationsListener.create(tracer, requestSpan) + ); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); + searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); + + PipelinedRequest searchRequest; + ActionListener listener; + try { + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + listener = searchRequest.transformResponseListener(updatedListener); + } catch (Exception e) { + updatedListener.onFailure(e); + return; } - ActionListener rewriteListener = buildRewriteListener( - sr, - task, - timeProvider, - searchAsyncActionProvider, - listener, - searchRequestContext - ); - if (sr.source() == null) { - rewriteListener.onResponse(sr.source()); - } else { - Rewriteable.rewriteAndFetch( - sr.source(), - searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), - rewriteListener + ActionListener requestTransformListener = ActionListener.wrap(sr -> { + if (searchQueryMetricsEnabled) { + try { + searchQueryCategorizer.categorize(sr.source()); + } catch (Exception e) { + logger.error("Error while trying to categorize the query.", e); + } + } + + ActionListener rewriteListener = buildRewriteListener( + sr, + task, + timeProvider, + searchAsyncActionProvider, + listener, + searchRequestContext ); - } - }, listener::onFailure); - searchRequest.transformRequest(requestTransformListener); + if (sr.source() == null) { + rewriteListener.onResponse(sr.source()); + } else { + Rewriteable.rewriteAndFetch( + sr.source(), + searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), + rewriteListener + ); + } + }, listener::onFailure); + searchRequest.transformRequest(requestTransformListener); + } } private ActionListener buildRewriteListener( @@ -1240,7 +1259,8 @@ private AbstractSearchAsyncAction searchAsyncAction ) ), clusters, - searchRequestContext + searchRequestContext, + tracer ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1271,7 +1291,8 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, task, clusters, - searchRequestContext + searchRequestContext, + tracer ); break; case QUERY_THEN_FETCH: @@ -1292,7 +1313,8 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, task, clusters, - searchRequestContext + searchRequestContext, + tracer ); break; default: diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index 6a97914b04ebc..212ef3c713d8e 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -75,6 +75,16 @@ private AttributeNames() { */ public static final String TRANSPORT_ACTION = "action"; + /** + * Task id + */ + public static final String TASK_ID = "task_id"; + + /** + * Parent task id + */ + public static final String PARENT_TASK_ID = "parent_task_id"; + /** * Index Name */ @@ -99,4 +109,9 @@ private AttributeNames() { * Refresh Policy */ public static final String REFRESH_POLICY = "refresh_policy"; + + /** + * Search Response Total Hits + */ + public static final String TOTAL_HITS = "total_hits"; } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index 70658c5d71bf3..42e64109b72fd 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -15,6 +15,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; +import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.attributes.Attributes; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; @@ -196,4 +197,43 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ return attributes; } + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param spanName name of span. + * @param parentSpan target parent span. + * @return context + */ + public static SpanCreationContext from(String spanName, SpanContext parentSpan) { + return SpanCreationContext.server().name(spanName).parent(parentSpan); + } + + /** + * Creates {@link SpanCreationContext} with parent set to specified SpanContext. + * @param task search task. + * @param actionName action. + * @return context + */ + public static SpanCreationContext from(Task task, String actionName) { + return SpanCreationContext.server().name(createSpanName(task, actionName)).attributes(buildSpanAttributes(task, actionName)); + } + + private static Attributes buildSpanAttributes(Task task, String actionName) { + Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, actionName); + if (task != null) { + attributes.addAttribute(AttributeNames.TASK_ID, task.getId()); + if (task.getParentTaskId() != null && task.getParentTaskId().isSet()) { + attributes.addAttribute(AttributeNames.PARENT_TASK_ID, task.getParentTaskId().getId()); + } + } + return attributes; + + } + + private static String createSpanName(Task task, String actionName) { + if (task != null) { + return task.getType() + SEPARATOR + task.getAction(); + } else { + return actionName; + } + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java new file mode 100644 index 0000000000000..71fb59194c447 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/listener/TraceableSearchRequestOperationsListener.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.listener; + +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchRequestContext; +import org.opensearch.action.search.SearchRequestOperationsListener; +import org.opensearch.telemetry.tracing.AttributeNames; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanContext; +import org.opensearch.telemetry.tracing.Tracer; + +/** + * SearchRequestOperationsListener subscriber for search request tracing + * + * @opensearch.internal + */ +public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener { + private final Tracer tracer; + private final Span requestSpan; + private SpanContext phaseSpanContext; + + public TraceableSearchRequestOperationsListener(final Tracer tracer, final Span requestSpan) { + this.tracer = tracer; + this.requestSpan = requestSpan; + this.phaseSpanContext = null; + } + + public static SearchRequestOperationsListener create(final Tracer tracer, final Span requestSpan) { + if (tracer.isRecording()) { + return new TraceableSearchRequestOperationsListener(tracer, requestSpan); + } else { + return SearchRequestOperationsListener.NOOP; + } + } + + @Override + protected void onPhaseStart(SearchPhaseContext context) { + assert phaseSpanContext == null : "There should be only one search phase active at a time"; + phaseSpanContext = tracer.getCurrentSpan(); + } + + @Override + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + assert phaseSpanContext != null : "There should be a search phase active at that time"; + phaseSpanContext.endSpan(); + phaseSpanContext = null; + } + + @Override + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { + assert phaseSpanContext != null : "There should be a search phase active at that time"; + phaseSpanContext.setError((Exception) cause); + phaseSpanContext.endSpan(); + phaseSpanContext = null; + } + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + // add response-related attributes on request end + requestSpan.addAttribute( + AttributeNames.TOTAL_HITS, + searchRequestContext.totalHits() == null ? 0 : searchRequestContext.totalHits().value + ); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 3af7af114e96d..420289d3ff2e5 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -58,6 +58,7 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -85,19 +86,16 @@ import java.util.function.BiFunction; import java.util.stream.IntStream; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; public class AbstractSearchAsyncActionTests extends OpenSearchTestCase { private final List> resolvedNodes = new ArrayList<>(); private final Set releasedContexts = new CopyOnWriteArraySet<>(); private ExecutorService executor; - private SearchRequestOperationsListener assertingListener; + private SearchRequestOperationsListenerAssertingListener assertingListener; ThreadPool threadPool; @Before @@ -106,27 +104,7 @@ public void setUp() throws Exception { super.setUp(); executor = Executors.newFixedThreadPool(1); threadPool = new TestThreadPool(getClass().getName()); - assertingListener = new SearchRequestOperationsListener() { - private volatile SearchPhase phase; - - @Override - protected void onPhaseStart(SearchPhaseContext context) { - assertThat(phase, is(nullValue())); - phase = context.getCurrentPhase(); - } - - @Override - protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - assertThat(phase, is(context.getCurrentPhase())); - phase = null; - } - - @Override - protected void onPhaseFailure(SearchPhaseContext context) { - assertThat(phase, is(context.getCurrentPhase())); - phase = null; - } - }; + assertingListener = new SearchRequestOperationsListenerAssertingListener(); } @After @@ -136,6 +114,7 @@ public void tearDown() throws Exception { executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + assertingListener.assertFinished(); } private AbstractSearchAsyncAction createAction( @@ -207,7 +186,8 @@ private AbstractSearchAsyncAction createAction( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), request - ) + ), + NoopTracer.INSTANCE ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -371,7 +351,7 @@ public void testOnPhaseFailureAndVerifyListeners() { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); SearchRequestStats testListener = new SearchRequestStats(clusterSettings); - final List requestOperationListeners = List.of(testListener); + final List requestOperationListeners = List.of(testListener, assertingListener); SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); action.start(); assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); @@ -403,6 +383,7 @@ public void run() { SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); searchShardIterator.resetAndSkip(); action.skipShard(searchShardIterator); + action.start(); action.executeNextPhase(action, fetchPhase); assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); action.onPhaseFailure(new SearchPhase("test") { @@ -645,7 +626,7 @@ public void testExecutePhaseOnShardFailureAndThrowException() throws Interrupted public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); SearchRequestStats testListener = new SearchRequestStats(clusterSettings); - final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + final List requestOperationListeners = new ArrayList<>(List.of(testListener, assertingListener)); long delay = (randomIntBetween(1, 5)); delay = delay * 10; @@ -685,8 +666,8 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); action.executeNextPhase(expandPhase, fetchPhase); + action.onPhaseDone(); /* finish phase since we don't have reponse being sent */ - action.sendSearchResponse(mock(InternalSearchResponse.class), mock(String.valueOf(QuerySearchResult.class))); assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName())); assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); @@ -695,7 +676,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx public void testOnPhaseListenersWithDfsType() throws InterruptedException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); SearchRequestStats testListener = new SearchRequestStats(clusterSettings); - final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + final List requestOperationListeners = new ArrayList<>(List.of(testListener, assertingListener)); SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( requestOperationListeners @@ -712,6 +693,11 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); + searchDfsQueryThenFetchAsyncAction.onPhaseFailure( + fetchPhase, + "Something went wrong", + null + ); /* finalizing the fetch phase since we do adhoc phase lifecycle calls */ assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay)); assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); @@ -754,7 +740,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, null, null, - null, + controller, executor, resultConsumer, searchRequest, @@ -767,7 +753,8 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), searchRequest - ) + ), + NoopTracer.INSTANCE ); } @@ -820,7 +807,8 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( new SearchRequestContext( new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), searchRequest - ) + ), + NoopTracer.INSTANCE ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 30fc50f91dabd..1881c705fe6b3 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -55,6 +55,7 @@ import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; @@ -66,7 +67,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,7 +75,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.IntStream; @@ -83,42 +82,22 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.collection.IsEmptyCollection.empty; public class CanMatchPreFilterSearchPhaseTests extends OpenSearchTestCase { - private SearchRequestOperationsListener assertingListener; - private Set phases; + private SearchRequestOperationsListenerAssertingListener assertingListener; @Before public void setUp() throws Exception { super.setUp(); - phases = Collections.newSetFromMap(new IdentityHashMap<>()); - assertingListener = new SearchRequestOperationsListener() { - @Override - protected void onPhaseStart(SearchPhaseContext context) { - assertThat(phases.contains(context.getCurrentPhase()), is(false)); - phases.add(context.getCurrentPhase()); - } - - @Override - protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - assertThat(phases.contains(context.getCurrentPhase()), is(true)); - phases.remove(context.getCurrentPhase()); - } - - @Override - protected void onPhaseFailure(SearchPhaseContext context) { - assertThat(phases.contains(context.getCurrentPhase()), is(true)); - phases.remove(context.getCurrentPhase()); - } - }; + assertingListener = new SearchRequestOperationsListenerAssertingListener(); } @After public void tearDown() throws Exception { super.tearDown(); - assertBusy(() -> assertThat(phases, empty()), 5, TimeUnit.SECONDS); + + assertingListener.assertFinished(); } public void testFilterShards() throws InterruptedException { @@ -164,6 +143,10 @@ public void sendCanMatch( final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -182,15 +165,13 @@ public void sendCanMatch( @Override public void run() throws IOException { result.set(iter); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); latch.countDown(); - assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), - searchRequest - ) + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -260,6 +241,10 @@ public void sendCanMatch( final SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(true); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -278,15 +263,13 @@ public void sendCanMatch( @Override public void run() throws IOException { result.set(iter); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); latch.countDown(); - assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), - searchRequest - ) + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -346,6 +329,10 @@ public void sendCanMatch( (e) -> { throw new AssertionError("unexpected", e); } ); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -360,58 +347,57 @@ public void sendCanMatch( timeProvider, ClusterState.EMPTY_STATE, null, - (iter) -> new AbstractSearchAsyncAction("test", logger, transportService, (cluster, node) -> { - assert cluster == null : "cluster was not null: " + cluster; - return lookup.get(node); - }, - aliasFilters, - Collections.emptyMap(), - Collections.emptyMap(), - executor, - searchRequest, - responseListener, - iter, - new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), - ClusterState.EMPTY_STATE, - null, - new ArraySearchPhaseResults<>(iter.size()), - randomIntBetween(1, 32), - SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), - searchRequest - ) - ) { - - @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { - return new SearchPhase("test") { + (iter) -> { + return new WrappingSearchAsyncActionPhase( + new AbstractSearchAsyncAction("test", logger, transportService, (cluster, node) -> { + assert cluster == null : "cluster was not null: " + cluster; + return lookup.get(node); + }, + aliasFilters, + Collections.emptyMap(), + Collections.emptyMap(), + executor, + searchRequest, + responseListener, + iter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + ClusterState.EMPTY_STATE, + null, + new ArraySearchPhaseResults<>(iter.size()), + randomIntBetween(1, 32), + SearchResponse.Clusters.EMPTY, + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE + ) { @Override - public void run() { - latch.countDown(); + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new WrappingSearchAsyncActionPhase(this) { + @Override + public void run() { + latch.countDown(); + } + }; } - }; - } - @Override - protected void executePhaseOnShard( - final SearchShardIterator shardIt, - final SearchShardTarget shard, - final SearchActionListener listener - ) { - if (randomBoolean()) { - listener.onResponse(new SearchPhaseResult() { - }); - } else { - listener.onFailure(new Exception("failure")); + @Override + protected void executePhaseOnShard( + final SearchShardIterator shardIt, + final SearchShardTarget shard, + final SearchActionListener listener + ) { + if (randomBoolean()) { + listener.onResponse(new SearchPhaseResult() { + }); + } else { + listener.onFailure(new Exception("failure")); + } + } } - } + ); }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), - searchRequest - ) + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -475,6 +461,10 @@ public void sendCanMatch( searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order))); searchRequest.allowPartialSearchResults(true); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -493,15 +483,13 @@ public void sendCanMatch( @Override public void run() { result.set(iter); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); latch.countDown(); - assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), - searchRequest - ) + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -580,6 +568,10 @@ public void sendCanMatch( searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order))); searchRequest.allowPartialSearchResults(true); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase( logger, searchTransportService, @@ -598,15 +590,13 @@ public void sendCanMatch( @Override public void run() { result.set(iter); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); latch.countDown(); - assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), - searchRequest - ) + new SearchRequestContext(searchRequestOperationsListener, searchRequest), + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -730,7 +720,8 @@ public void run() { }; }, SearchResponse.Clusters.EMPTY, - searchRequestContext + searchRequestContext, + NoopTracer.INSTANCE ); canMatchPhase.start(); @@ -779,7 +770,8 @@ private static final class SearchDfsQueryAsyncAction extends AbstractSearchAsync new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchRequestContext + searchRequestContext, + NoopTracer.INSTANCE ); this.listener = searchRequestContext.getSearchRequestOperationsListener(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index af7adc4e58fb8..35e90ff662b19 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -51,11 +51,14 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestOptions; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; @@ -79,6 +82,23 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class SearchAsyncActionTests extends OpenSearchTestCase { + private SearchRequestOperationsListenerAssertingListener assertingListener; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + assertingListener = new SearchRequestOperationsListenerAssertingListener(); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + + assertingListener.assertFinished(); + } public void testSkipSearchShards() throws InterruptedException { SearchRequest request = new SearchRequest(); @@ -117,6 +137,10 @@ public void testSkipSearchShards() throws InterruptedException { lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AtomicInteger numRequests = new AtomicInteger(0); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction( "test", logger, @@ -138,7 +162,8 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(searchRequestOperationsListener, request), + NoopTracer.INSTANCE ) { @Override @@ -169,6 +194,7 @@ protected SearchPhase getNextPhase(SearchPhaseResults res @Override public void run() { assertTrue(searchPhaseDidRun.compareAndSet(false, true)); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, request, this), null); } }; } @@ -236,6 +262,10 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); CountDownLatch awaitInitialRequests = new CountDownLatch(1); AtomicInteger numRequests = new AtomicInteger(0); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction( "test", logger, @@ -257,7 +287,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(searchRequestOperationsListener, request), + NoopTracer.INSTANCE ) { @Override @@ -295,6 +326,7 @@ protected SearchPhase getNextPhase(SearchPhaseResults res return new SearchPhase("test") { @Override public void run() { + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, request, this), null); assertTrue(searchPhaseDidRun.compareAndSet(false, true)); } }; @@ -375,7 +407,11 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), + request + ), + NoopTracer.INSTANCE ) { TestSearchResponse response = new TestSearchResponse(); @@ -411,6 +447,7 @@ public void run() { sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node), OriginalIndices.NONE); } responseListener.onResponse(response); + assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, request, this), null); } }; } @@ -498,7 +535,11 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), + request + ), + NoopTracer.INSTANCE ) { TestSearchResponse response = new TestSearchResponse(); @@ -591,6 +632,10 @@ public void testAllowPartialResults() throws InterruptedException { Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AtomicInteger numRequests = new AtomicInteger(0); AtomicInteger numFailReplicas = new AtomicInteger(0); + final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener( + List.of(assertingListener), + LogManager.getLogger() + ); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction( "test", logger, @@ -612,7 +657,8 @@ public void testAllowPartialResults() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) + new SearchRequestContext(searchRequestOperationsListener, request), + NoopTracer.INSTANCE ) { @Override protected void executePhaseOnShard( @@ -645,6 +691,7 @@ protected SearchPhase getNextPhase(SearchPhaseResults res @Override public void run() { assertTrue(searchPhaseDidRun.compareAndSet(false, true)); + searchRequestOperationsListener.onPhaseEnd(new MockSearchPhaseContext(1, request, this), null); } }; } diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index faf6f86c69c27..aefbbe80d5fa1 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -59,9 +59,12 @@ import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.sort.SortBuilders; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; +import org.junit.After; +import org.junit.Before; import java.util.Collections; import java.util.List; @@ -77,6 +80,24 @@ import static org.hamcrest.Matchers.instanceOf; public class SearchQueryThenFetchAsyncActionTests extends OpenSearchTestCase { + private SearchRequestOperationsListenerAssertingListener assertingListener; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + assertingListener = new SearchRequestOperationsListenerAssertingListener(); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + + assertingListener.assertFinished(); + } + public void testBottomFieldSort() throws Exception { testCase(false, false); } @@ -218,15 +239,17 @@ public void sendExecuteQuery( task, SearchResponse.Clusters.EMPTY, new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequestOperationsListener.CompositeListener(List.of(assertingListener), LogManager.getLogger()), searchRequest - ) + ), + NoopTracer.INSTANCE ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return new SearchPhase("test") { @Override public void run() { + assertingListener.onPhaseEnd(new MockSearchPhaseContext(1, searchRequest, this), null); latch.countDown(); } }; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 1cb336e18b12c..845543fbd9f57 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -125,7 +125,7 @@ protected void onPhaseStart(SearchPhaseContext context) {} protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - protected void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {} }; } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerAssertingListener.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerAssertingListener.java new file mode 100644 index 0000000000000..327371ebcaf0b --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerAssertingListener.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class SearchRequestOperationsListenerAssertingListener extends SearchRequestOperationsListener { + private volatile SearchPhase phase; + + @Override + protected void onPhaseStart(SearchPhaseContext context) { + assertThat(phase, is(nullValue())); + phase = context.getCurrentPhase(); + } + + @Override + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + assertThat(phase, is(context.getCurrentPhase())); + phase = null; + } + + @Override + protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) { + assertThat(phase, is(context.getCurrentPhase())); + phase = null; + } + + public void assertFinished() { + assertThat(phase, is(nullValue())); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index a53c35a8401b3..990ed95f1aebc 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -40,7 +40,7 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe } @Override - public void onPhaseFailure(SearchPhaseContext context) { + public void onPhaseFailure(SearchPhaseContext context, Throwable cause) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } }; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 377ccebbfd418..fb9b26e3f3ad1 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -36,7 +36,7 @@ public void testSearchRequestPhaseFailure() { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); testRequestStats.onPhaseStart(ctx); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure(ctx, new Throwable()); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); } } @@ -156,7 +156,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseFailure(ctx); + testRequestStats.onPhaseFailure(ctx, new Throwable()); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 33f3577e15c52..635939e68de71 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2313,7 +2313,8 @@ public void onFailure(final Exception e) { client ), NoopMetricsRegistry.INSTANCE, - searchRequestOperationsCompositeListenerFactory + searchRequestOperationsCompositeListenerFactory, + NoopTracer.INSTANCE ) ); actions.put( diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java index 75fc6761a60ef..4b763e4bd4454 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/SpanBuilderTests.java @@ -21,6 +21,7 @@ import org.opensearch.http.HttpResponse; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.telemetry.tracing.noop.NoopSpan; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; @@ -104,6 +105,16 @@ public void testTransportContext() { assertEquals(connection.getNode().getHostAddress(), attributes.getAttributesMap().get(AttributeNames.TRANSPORT_TARGET_HOST)); } + public void testParentSpan() { + String spanName = "test-name"; + SpanContext parentSpanContext = new SpanContext(NoopSpan.INSTANCE); + SpanCreationContext context = SpanBuilder.from(spanName, parentSpanContext); + Attributes attributes = context.getAttributes(); + assertNull(attributes); + assertEquals(spanName, context.getSpanName()); + assertEquals(parentSpanContext, context.getParent()); + } + private static Transport.Connection createTransportConnection() { return new Transport.Connection() { @Override