From 7e12f36135881d142497b7b63870436d3045c951 Mon Sep 17 00:00:00 2001 From: David Z <38449481+dzane17@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:11:55 -0700 Subject: [PATCH] [Backport 2.x] Per request phase latency (#10351) (#10797) * Per request phase latency (#10351) Signed-off-by: David Zane (cherry picked from commit daf1350888f878868748172f576a0cdb3dc64b33) * Update SearchRequest version check Signed-off-by: David Zane --------- Signed-off-by: David Z <38449481+dzane17@users.noreply.github.com> Signed-off-by: David Zane --- CHANGELOG.md | 1 + .../search/AbstractSearchAsyncAction.java | 2 + .../action/search/MultiSearchRequest.java | 5 + .../action/search/SearchRequest.java | 31 +++- .../action/search/SearchResponse.java | 132 +++++++++++++++++- .../action/search/SearchResponseMerger.java | 1 + .../action/search/TransportSearchAction.java | 129 +++++++++++++---- .../common/settings/ClusterSettings.java | 1 + .../rest/action/search/RestSearchAction.java | 6 + .../AbstractSearchAsyncActionTests.java | 15 +- .../action/search/SearchRequestTests.java | 1 + .../action/search/SearchResponseTests.java | 47 ++++++- .../search/SearchTimeProviderTests.java | 54 +++++++ .../search/RandomSearchRequestGenerator.java | 3 + 14 files changed, 394 insertions(+), 34 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6099069b90deb..be6c45a8b066d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) - [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) - Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) +- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 1c0a1280ad550..14f57218ae1dc 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -215,6 +215,7 @@ public final void start() { 0, 0, buildTookInMillis(), + timeProvider.getPhaseTook(), ShardSearchFailure.EMPTY_ARRAY, clusters, null @@ -662,6 +663,7 @@ protected final SearchResponse buildSearchResponse( successfulOps.get(), skippedOps.get(), buildTookInMillis(), + timeProvider.getPhaseTook(), failures, clusters, searchContextId diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java index da8f8f144eaf2..00e0345062d1c 100644 --- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java @@ -277,6 +277,8 @@ public static void readMultiLineFormat( } else if ("cancel_after_time_interval".equals(entry.getKey()) || "cancelAfterTimeInterval".equals(entry.getKey())) { searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null)); + } else if ("phase_took".equals(entry.getKey())) { + searchRequest.setPhaseTook(nodeBooleanValue(value)); } else { throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section"); } @@ -374,6 +376,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.getCancelAfterTimeInterval() != null) { xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep()); } + if (request.isPhaseTook() != null) { + xContentBuilder.field("phase_took", request.isPhaseTook()); + } xContentBuilder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 5d9f6d57649db..b4a91c287610a 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -118,6 +118,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private String pipeline; + private Boolean phaseTook = null; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -210,6 +212,7 @@ private SearchRequest( this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval; + this.phaseTook = searchRequest.phaseTook; } /** @@ -263,6 +266,9 @@ public SearchRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_7_0)) { pipeline = in.readOptionalString(); } + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + phaseTook = in.readOptionalBoolean(); + } } @Override @@ -303,6 +309,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_7_0)) { out.writeOptionalString(pipeline); } + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + out.writeOptionalBoolean(phaseTook); + } } @Override @@ -634,6 +643,20 @@ public void setPreFilterShardSize(int preFilterShardSize) { this.preFilterShardSize = preFilterShardSize; } + /** + * Returns value of user-provided phase_took query parameter for this search request. + */ + public Boolean isPhaseTook() { + return phaseTook; + } + + /** + * Sets value of phase_took query param if provided by user. Defaults to null. + */ + public void setPhaseTook(Boolean phaseTook) { + this.phaseTook = phaseTook; + } + /** * Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards * the search request expands to exceeds the threshold, or null if the threshold is unspecified. @@ -738,7 +761,8 @@ public boolean equals(Object o) { && absoluteStartMillis == that.absoluteStartMillis && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval) - && Objects.equals(pipeline, that.pipeline); + && Objects.equals(pipeline, that.pipeline) + && Objects.equals(phaseTook, that.phaseTook); } @Override @@ -759,7 +783,8 @@ public int hashCode() { localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, - cancelAfterTimeInterval + cancelAfterTimeInterval, + phaseTook ); } @@ -802,6 +827,8 @@ public String toString() { + cancelAfterTimeInterval + ", pipeline=" + pipeline + + ", phaseTook=" + + phaseTook + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 3eb698f2c140d..63c99b7d026b8 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.StatusToXContentObject; @@ -64,7 +65,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.function.Supplier; @@ -96,6 +99,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb private final ShardSearchFailure[] shardFailures; private final Clusters clusters; private final long tookInMillis; + private final PhaseTook phaseTook; private List searchExtBuilders = new ArrayList<>(); @@ -116,6 +120,11 @@ public SearchResponse(StreamInput in) throws IOException { clusters = new Clusters(in); scrollId = in.readOptionalString(); tookInMillis = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + phaseTook = in.readOptionalWriteable(PhaseTook::new); + } else { + phaseTook = null; + } skippedShards = in.readVInt(); if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) { pointInTimeId = in.readOptionalString(); @@ -134,7 +143,32 @@ public SearchResponse( ShardSearchFailure[] shardFailures, Clusters clusters ) { - this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null); + this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, null, shardFailures, clusters, null); + } + + public SearchResponse( + SearchResponseSections internalResponse, + String scrollId, + int totalShards, + int successfulShards, + int skippedShards, + long tookInMillis, + ShardSearchFailure[] shardFailures, + Clusters clusters, + String pointInTimeId + ) { + this( + internalResponse, + scrollId, + totalShards, + successfulShards, + skippedShards, + tookInMillis, + null, + shardFailures, + clusters, + pointInTimeId + ); } public SearchResponse( @@ -144,6 +178,7 @@ public SearchResponse( int successfulShards, int skippedShards, long tookInMillis, + PhaseTook phaseTook, ShardSearchFailure[] shardFailures, Clusters clusters, String pointInTimeId @@ -156,6 +191,7 @@ public SearchResponse( this.successfulShards = successfulShards; this.skippedShards = skippedShards; this.tookInMillis = tookInMillis; + this.phaseTook = phaseTook; this.shardFailures = shardFailures; assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards; assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId [" @@ -218,6 +254,13 @@ public TimeValue getTook() { return new TimeValue(tookInMillis); } + /** + * How long the request took in each search phase. + */ + public PhaseTook getPhaseTook() { + return phaseTook; + } + /** * The total number of shards the search was executed on. */ @@ -306,6 +349,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId); } builder.field(TOOK.getPreferredName(), tookInMillis); + if (phaseTook != null) { + phaseTook.toXContent(builder, params); + } builder.field(TIMED_OUT.getPreferredName(), isTimedOut()); if (isTerminatedEarly() != null) { builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly()); @@ -345,6 +391,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE Boolean terminatedEarly = null; int numReducePhases = 1; long tookInMillis = -1; + PhaseTook phaseTook = null; int successfulShards = -1; int totalShards = -1; int skippedShards = 0; // 0 for BWC @@ -409,6 +456,24 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE parser.skipChildren(); } } + } else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) { + Map phaseTookMap = new HashMap<>(); + + while ((token = parser.nextToken()) != Token.END_OBJECT) { + if (token == Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + try { + SearchPhaseName.valueOf(currentFieldName.toUpperCase(Locale.ROOT)); + phaseTookMap.put(currentFieldName, parser.longValue()); + } catch (final IllegalArgumentException ex) { + parser.skipChildren(); + } + } else { + parser.skipChildren(); + } + } + phaseTook = new PhaseTook(phaseTookMap); } else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { int successful = -1; int total = -1; @@ -480,6 +545,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE successfulShards, skippedShards, tookInMillis, + phaseTook, failures.toArray(ShardSearchFailure.EMPTY_ARRAY), clusters, searchContextId @@ -499,6 +565,9 @@ public void writeTo(StreamOutput out) throws IOException { clusters.writeTo(out); out.writeOptionalString(scrollId); out.writeVLong(tookInMillis); + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + out.writeOptionalWriteable(phaseTook); + } out.writeVInt(skippedShards); if (out.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) { out.writeOptionalString(pointInTimeId); @@ -618,6 +687,67 @@ public String toString() { } } + /** + * Holds info about the clusters that the search was executed on: how many in total, how many of them were successful + * and how many of them were skipped. + * + * @opensearch.internal + */ + public static class PhaseTook implements ToXContentFragment, Writeable { + static final ParseField PHASE_TOOK = new ParseField("phase_took"); + private final Map phaseTookMap; + + public PhaseTook(Map phaseTookMap) { + this.phaseTookMap = phaseTookMap; + } + + private PhaseTook(StreamInput in) throws IOException { + this(in.readMap(StreamInput::readString, StreamInput::readLong)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(PHASE_TOOK.getPreferredName()); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + if (phaseTookMap.containsKey(searchPhaseName.getName())) { + builder.field(searchPhaseName.getName(), phaseTookMap.get(searchPhaseName.getName())); + } else { + builder.field(searchPhaseName.getName(), 0); + } + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PhaseTook phaseTook = (PhaseTook) o; + + if (phaseTook.phaseTookMap.equals(phaseTookMap)) { + return true; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(phaseTookMap); + } + } + static SearchResponse empty(Supplier tookInMillisSupplier, Clusters clusters) { SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN); InternalSearchResponse internalSearchResponse = new InternalSearchResponse( diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java index f90e98106f93f..054bd578cc56c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) { successfulShards, skippedShards, tookInMillis, + searchTimeProvider.getPhaseTook(), shardFailures, clusters, null diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 3fd0034ae5634..a6fb8453af4ff 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -99,6 +99,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -162,6 +163,14 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( + SEARCH_PHASE_TOOK_ENABLED_KEY, + false, + Property.Dynamic, + Property.NodeScope + ); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -278,6 +287,8 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust } /** + * Listener to track request-level tookTime and phase tookTimes from the coordinator. + * * Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving * "now" to an index name). Another clock is needed for measuring how long a search operation * took. These two uses are at odds with each other. There are many issues with using a real @@ -287,11 +298,12 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust * * @opensearch.internal */ - static final class SearchTimeProvider { + static final class SearchTimeProvider implements SearchRequestOperationsListener { private final long absoluteStartMillis; private final long relativeStartNanos; private final LongSupplier relativeCurrentNanosProvider; + private boolean phaseTook = false; /** * Instantiates a new search time provider. The absolute start time is the real clock time @@ -317,6 +329,47 @@ long getAbsoluteStartMillis() { long buildTookInMillis() { return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos); } + + public void setPhaseTook(boolean phaseTook) { + this.phaseTook = phaseTook; + } + + public boolean isPhaseTook() { + return phaseTook; + } + + SearchResponse.PhaseTook getPhaseTook() { + if (phaseTook) { + Map phaseTookMap = new HashMap<>(); + // Convert Map to Map for SearchResponse() + for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) { + phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName)); + } + return new SearchResponse.PhaseTook(phaseTookMap); + } else { + return null; + } + } + + Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + phaseStatsMap.put( + context.getCurrentPhase().getSearchPhaseName(), + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()) + ); + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + public Long getPhaseTookTime(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName); + } } @Override @@ -358,13 +411,6 @@ public void executeRequest( SinglePhaseSearchAction phaseSearchAction, ActionListener listener ) { - final List searchListenersList = createSearchListenerList(); - final SearchRequestOperationsListener searchRequestOperationsListener; - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); - } else { - searchRequestOperationsListener = null; - } executeRequest(task, searchRequest, new SearchAsyncActionProvider() { @Override public AbstractSearchAsyncAction asyncSearchAction( @@ -381,7 +427,8 @@ public AbstractSearchAsyncAction asyncSearchAction( ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { return new AbstractSearchAsyncAction( actionName, @@ -445,6 +492,16 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); + + final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider); + + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } + PipelinedRequest searchRequest; ActionListener listener; try { @@ -496,7 +553,8 @@ private void executeRequest( clusterState, listener, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); } else { if (shouldMinimizeRoundtrips(searchRequest)) { @@ -517,7 +575,8 @@ private void executeRequest( clusterState, l, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ) ); } else { @@ -567,7 +626,8 @@ private void executeRequest( listener, new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()), searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); }, listener::onFailure) ); @@ -656,6 +716,7 @@ public void onResponse(SearchResponse searchResponse) { searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), timeProvider.buildTookInMillis(), + timeProvider.getPhaseTook(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0), searchResponse.pointInTimeId() @@ -845,7 +906,8 @@ private void executeLocalSearch( ClusterState clusterState, ActionListener listener, SearchContextId searchContext, - SearchAsyncActionProvider searchAsyncActionProvider + SearchAsyncActionProvider searchAsyncActionProvider, + SearchRequestOperationsListener searchRequestOperationsListener ) { executeSearch( (SearchTask) task, @@ -859,7 +921,8 @@ private void executeLocalSearch( listener, SearchResponse.Clusters.EMPTY, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); } @@ -977,7 +1040,8 @@ private void executeSearch( ActionListener listener, SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext, - SearchAsyncActionProvider searchAsyncActionProvider + SearchAsyncActionProvider searchAsyncActionProvider, + SearchRequestOperationsListener searchRequestOperationsListener ) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -1078,7 +1142,8 @@ private void executeSearch( listener, preFilterSearchShards, threadPool, - clusters + clusters, + searchRequestOperationsListener ).start(); } @@ -1161,15 +1226,30 @@ AbstractSearchAsyncAction asyncSearchAction( ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ); } - private List createSearchListenerList() { + private List createSearchListenerList(SearchRequest searchRequest, SearchTimeProvider timeProvider) { final List searchListenersList = new ArrayList<>(); + if (isRequestStatsEnabled) { searchListenersList.add(searchRequestStats); } + + // phase_took is enabled with request param and/or cluster setting + Boolean phaseTookRequestParam = searchRequest.isPhaseTook(); + if (phaseTookRequestParam == null) { // check cluster setting only when request param is undefined + if (clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED)) { + timeProvider.setPhaseTook(true); + searchListenersList.add(timeProvider); + } + } else if (phaseTookRequestParam == true) { + timeProvider.setPhaseTook(true); + searchListenersList.add(timeProvider); + } + return searchListenersList; } @@ -1187,15 +1267,9 @@ private AbstractSearchAsyncAction searchAsyncAction ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { - final List searchListenersList = createSearchListenerList(); - final SearchRequestOperationsListener searchRequestOperationsListener; - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); - } else { - searchRequestOperationsListener = null; - } if (preFilter) { return new CanMatchPreFilterSearchPhase( logger, @@ -1226,7 +1300,8 @@ private AbstractSearchAsyncAction searchAsyncAction listener, false, threadPool, - clusters + clusters, + searchRequestOperationsListener ); return new SearchPhase(action.getName()) { @Override diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a37c2ccde0876..6c1ffb58783aa 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -377,6 +377,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED, + TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index ebfd082d974fd..080366e536da1 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -180,6 +180,12 @@ public static void parseSearchRequest( searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null)); } + if (request.hasParam("phase_took")) { + // only set if we have the parameter passed to override the cluster-level default + // else phaseTook = null + searchRequest.setPhaseTook(request.paramAsBoolean("phase_took", true)); + } + // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types // from the REST layer. these modes are an internal optimization and should // not be specified explicitly by the user. diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index f628bb3201452..edac50813e191 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -688,7 +688,11 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct ); AtomicReference exception = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); - + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); return new SearchDfsQueryThenFetchAsyncAction( logger, null, @@ -702,7 +706,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct searchRequest, listener, shardsIter, - null, + timeProvider, null, task, SearchResponse.Clusters.EMPTY, @@ -734,6 +738,11 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( ); AtomicReference exception = new AtomicReference<>(); ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); return new SearchQueryThenFetchAsyncAction( logger, null, @@ -747,7 +756,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( searchRequest, listener, shardsIter, - null, + timeProvider, null, task, SearchResponse.Clusters.EMPTY, diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java index 7e6dfcc9ff645..90e154e6e66fc 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java @@ -254,6 +254,7 @@ private SearchRequest mutate(SearchRequest searchRequest) { ); mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder))); mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false)); + mutators.add(() -> mutation.setPhaseTook(searchRequest.isPhaseTook() == false)); mutators.add( () -> mutation.setCancelAfterTimeInterval( searchRequest.getCancelAfterTimeInterval() != null diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java index 097e922147698..c9e59ab4ea04d 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java @@ -74,7 +74,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import static java.util.Collections.singletonMap; @@ -152,6 +154,11 @@ public SearchResponse createTestItem( Boolean terminatedEarly = randomBoolean() ? null : randomBoolean(); int numReducePhases = randomIntBetween(1, 10); long tookInMillis = randomNonNegativeLong(); + Map phaseTookMap = new HashMap<>(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseTookMap.put(searchPhaseName.getName(), randomNonNegativeLong()); + } + SearchResponse.PhaseTook phaseTook = new SearchResponse.PhaseTook(phaseTookMap); int totalShards = randomIntBetween(1, Integer.MAX_VALUE); int successfulShards = randomIntBetween(0, totalShards); int skippedShards = randomIntBetween(0, totalShards); @@ -182,6 +189,7 @@ public SearchResponse createTestItem( successfulShards, skippedShards, tookInMillis, + phaseTook, shardSearchFailures, randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY, null @@ -353,6 +361,14 @@ public void testToXContent() { assertEquals(1, searchExtBuilders.size()); } { + Map phaseTookMap = new HashMap<>(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseTookMap.put(searchPhaseName.getName(), 0L); + } + phaseTookMap.put(SearchPhaseName.QUERY.getName(), 50L); + phaseTookMap.put(SearchPhaseName.FETCH.getName(), 25L); + phaseTookMap.put(SearchPhaseName.EXPAND.getName(), 30L); + SearchResponse.PhaseTook phaseTook = new SearchResponse.PhaseTook(phaseTookMap); SearchResponse response = new SearchResponse( new InternalSearchResponse( new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f), @@ -368,13 +384,24 @@ public void testToXContent() { 0, 0, 0, + phaseTook, ShardSearchFailure.EMPTY_ARRAY, - new SearchResponse.Clusters(5, 3, 2) + new SearchResponse.Clusters(5, 3, 2), + null ); StringBuilder expectedString = new StringBuilder(); expectedString.append("{"); { expectedString.append("\"took\":0,"); + expectedString.append("\"phase_took\":"); + { + expectedString.append("{\"dfs_pre_query\":0,"); + expectedString.append("\"query\":50,"); + expectedString.append("\"fetch\":25,"); + expectedString.append("\"dfs_query\":0,"); + expectedString.append("\"expand\":30,"); + expectedString.append("\"can_match\":0},"); + } expectedString.append("\"timed_out\":false,"); expectedString.append("\"_shards\":"); { @@ -477,6 +504,24 @@ public void testToXContentEmptyClusters() throws IOException { assertEquals(0, builder.toString().length()); } + public void testSearchResponsePhaseTookEquals() throws IOException { + SearchResponse.PhaseTook phaseTookA = new SearchResponse.PhaseTook(Map.of("foo", 0L, "bar", 1L)); + SearchResponse.PhaseTook phaseTookB = new SearchResponse.PhaseTook(Map.of("foo", 1L, "bar", 1L)); + SearchResponse.PhaseTook phaseTookC = new SearchResponse.PhaseTook(Map.of("foo", 0L)); + SearchResponse.PhaseTook phaseTookD = new SearchResponse.PhaseTook(Map.of()); + + assertNotEquals(phaseTookA, phaseTookB); + assertNotEquals(phaseTookB, phaseTookA); + assertNotEquals(phaseTookA, phaseTookC); + assertNotEquals(phaseTookC, phaseTookA); + assertNotEquals(phaseTookA, phaseTookD); + assertNotEquals(phaseTookD, phaseTookA); + assertEquals(phaseTookA, phaseTookA); + assertEquals(phaseTookB, phaseTookB); + assertEquals(phaseTookC, phaseTookC); + assertEquals(phaseTookD, phaseTookD); + } + static class DummySearchExtBuilder extends SearchExtBuilder { static ParseField DUMMY_FIELD = new ParseField("dummy"); diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java new file mode 100644 index 0000000000000..f0f1a43e6c21e --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchTimeProviderTests extends OpenSearchTestCase { + + public void testSearchTimeProviderPhaseFailure() { + TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + testTimeProvider.onPhaseStart(ctx); + assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); + testTimeProvider.onPhaseFailure(ctx); + assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); + } + } + + public void testSearchTimeProviderPhaseEnd() { + TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); + + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + long tookTimeInMillis = randomIntBetween(1, 100); + testTimeProvider.onPhaseStart(ctx); + long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); + assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); + testTimeProvider.onPhaseEnd(ctx); + assertThat(testTimeProvider.getPhaseTookTime(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java index b942136e1f1e2..74de1e6d96d93 100644 --- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java @@ -131,6 +131,9 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.setCancelAfterTimeInterval(TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval")); } + if (randomBoolean()) { + searchRequest.setPhaseTook(randomBoolean()); + } return searchRequest; }