diff --git a/CHANGELOG.md b/CHANGELOG.md
index a9e5bb3982708..a053bec6f458d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
+- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
### Dependencies
diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java
index 1c0a1280ad550..14f57218ae1dc 100644
--- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java
+++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java
@@ -215,6 +215,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
+ timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
@@ -662,6 +663,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
+ timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java
index da8f8f144eaf2..00e0345062d1c 100644
--- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java
+++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java
@@ -277,6 +277,8 @@ public static void readMultiLineFormat(
} else if ("cancel_after_time_interval".equals(entry.getKey())
|| "cancelAfterTimeInterval".equals(entry.getKey())) {
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
+ } else if ("phase_took".equals(entry.getKey())) {
+ searchRequest.setPhaseTook(nodeBooleanValue(value));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
@@ -374,6 +376,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.getCancelAfterTimeInterval() != null) {
xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep());
}
+ if (request.isPhaseTook() != null) {
+ xContentBuilder.field("phase_took", request.isPhaseTook());
+ }
xContentBuilder.endObject();
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java
index 21cf0ed97b9da..9e50213eab5f9 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java
@@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
private String pipeline;
+ private Boolean phaseTook = null;
+
public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
@@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
+ this.phaseTook = searchRequest.phaseTook;
}
/**
@@ -253,6 +256,9 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
+ if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
+ phaseTook = in.readOptionalBoolean();
+ }
}
@Override
@@ -284,6 +290,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
+ if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
+ out.writeOptionalBoolean(phaseTook);
+ }
}
@Override
@@ -615,6 +624,20 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}
+ /**
+ * Returns value of user-provided phase_took query parameter for this search request.
+ */
+ public Boolean isPhaseTook() {
+ return phaseTook;
+ }
+
+ /**
+ * Sets value of phase_took query param if provided by user. Defaults to null
.
+ */
+ public void setPhaseTook(Boolean phaseTook) {
+ this.phaseTook = phaseTook;
+ }
+
/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or null
if the threshold is unspecified.
@@ -719,7 +742,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
- && Objects.equals(pipeline, that.pipeline);
+ && Objects.equals(pipeline, that.pipeline)
+ && Objects.equals(phaseTook, that.phaseTook);
}
@Override
@@ -740,7 +764,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
- cancelAfterTimeInterval
+ cancelAfterTimeInterval,
+ phaseTook
);
}
@@ -783,6 +808,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ + ", phaseTook="
+ + phaseTook
+ "}";
}
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java
index a546311a1f668..91f0dc0737637 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java
@@ -33,6 +33,7 @@
package org.opensearch.action.search;
import org.apache.lucene.search.TotalHits;
+import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.StatusToXContentObject;
@@ -63,7 +64,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
@@ -94,6 +97,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
+ private final PhaseTook phaseTook;
public SearchResponse(StreamInput in) throws IOException {
super(in);
@@ -112,6 +116,11 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
+ if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
+ phaseTook = in.readOptionalWriteable(PhaseTook::new);
+ } else {
+ phaseTook = null;
+ }
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
@@ -126,7 +135,32 @@ public SearchResponse(
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
- this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
+ this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, null, shardFailures, clusters, null);
+ }
+
+ public SearchResponse(
+ SearchResponseSections internalResponse,
+ String scrollId,
+ int totalShards,
+ int successfulShards,
+ int skippedShards,
+ long tookInMillis,
+ ShardSearchFailure[] shardFailures,
+ Clusters clusters,
+ String pointInTimeId
+ ) {
+ this(
+ internalResponse,
+ scrollId,
+ totalShards,
+ successfulShards,
+ skippedShards,
+ tookInMillis,
+ null,
+ shardFailures,
+ clusters,
+ pointInTimeId
+ );
}
public SearchResponse(
@@ -136,6 +170,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
+ PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
@@ -148,6 +183,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
+ this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
@@ -210,6 +246,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
+ /**
+ * How long the request took in each search phase.
+ */
+ public PhaseTook getPhaseTook() {
+ return phaseTook;
+ }
+
/**
* The total number of shards the search was executed on.
*/
@@ -298,6 +341,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
+ if (phaseTook != null) {
+ phaseTook.toXContent(builder, params);
+ }
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
@@ -337,6 +383,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
+ PhaseTook phaseTook = null;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
@@ -401,6 +448,24 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
+ } else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
+ Map phaseTookMap = new HashMap<>();
+
+ while ((token = parser.nextToken()) != Token.END_OBJECT) {
+ if (token == Token.FIELD_NAME) {
+ currentFieldName = parser.currentName();
+ } else if (token.isValue()) {
+ try {
+ SearchPhaseName.valueOf(currentFieldName.toUpperCase(Locale.ROOT));
+ phaseTookMap.put(currentFieldName, parser.longValue());
+ } catch (final IllegalArgumentException ex) {
+ parser.skipChildren();
+ }
+ } else {
+ parser.skipChildren();
+ }
+ }
+ phaseTook = new PhaseTook(phaseTookMap);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
@@ -472,6 +537,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
+ phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
@@ -491,6 +557,9 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
+ if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
+ out.writeOptionalWriteable(phaseTook);
+ }
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
@@ -604,6 +673,67 @@ public String toString() {
}
}
+ /**
+ * Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
+ * and how many of them were skipped.
+ *
+ * @opensearch.internal
+ */
+ public static class PhaseTook implements ToXContentFragment, Writeable {
+ static final ParseField PHASE_TOOK = new ParseField("phase_took");
+ private final Map phaseTookMap;
+
+ public PhaseTook(Map phaseTookMap) {
+ this.phaseTookMap = phaseTookMap;
+ }
+
+ private PhaseTook(StreamInput in) throws IOException {
+ this(in.readMap(StreamInput::readString, StreamInput::readLong));
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject(PHASE_TOOK.getPreferredName());
+
+ for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
+ if (phaseTookMap.containsKey(searchPhaseName.getName())) {
+ builder.field(searchPhaseName.getName(), phaseTookMap.get(searchPhaseName.getName()));
+ } else {
+ builder.field(searchPhaseName.getName(), 0);
+ }
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PhaseTook phaseTook = (PhaseTook) o;
+
+ if (phaseTook.phaseTookMap.equals(phaseTookMap)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(phaseTookMap);
+ }
+ }
+
static SearchResponse empty(Supplier tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
index f90e98106f93f..054bd578cc56c 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchResponseMerger.java
@@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
+ searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
index cff1005beff27..284f71bd9da62 100644
--- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
+++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
@@ -98,6 +98,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -154,6 +155,14 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
+ SEARCH_PHASE_TOOK_ENABLED_KEY,
+ false,
+ Property.Dynamic,
+ Property.NodeScope
+ );
+
private final NodeClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
@@ -252,6 +261,8 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust
}
/**
+ * Listener to track request-level tookTime and phase tookTimes from the coordinator.
+ *
* Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving
* "now" to an index name). Another clock is needed for measuring how long a search operation
* took. These two uses are at odds with each other. There are many issues with using a real
@@ -261,11 +272,12 @@ private Map resolveIndexBoosts(SearchRequest searchRequest, Clust
*
* @opensearch.internal
*/
- static final class SearchTimeProvider {
+ static final class SearchTimeProvider implements SearchRequestOperationsListener {
private final long absoluteStartMillis;
private final long relativeStartNanos;
private final LongSupplier relativeCurrentNanosProvider;
+ private boolean phaseTook = false;
/**
* Instantiates a new search time provider. The absolute start time is the real clock time
@@ -291,6 +303,47 @@ long getAbsoluteStartMillis() {
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos);
}
+
+ public void setPhaseTook(boolean phaseTook) {
+ this.phaseTook = phaseTook;
+ }
+
+ public boolean isPhaseTook() {
+ return phaseTook;
+ }
+
+ SearchResponse.PhaseTook getPhaseTook() {
+ if (phaseTook) {
+ Map phaseTookMap = new HashMap<>();
+ // Convert Map to Map for SearchResponse()
+ for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) {
+ phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName));
+ }
+ return new SearchResponse.PhaseTook(phaseTookMap);
+ } else {
+ return null;
+ }
+ }
+
+ Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class);
+
+ @Override
+ public void onPhaseStart(SearchPhaseContext context) {}
+
+ @Override
+ public void onPhaseEnd(SearchPhaseContext context) {
+ phaseStatsMap.put(
+ context.getCurrentPhase().getSearchPhaseName(),
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos())
+ );
+ }
+
+ @Override
+ public void onPhaseFailure(SearchPhaseContext context) {}
+
+ public Long getPhaseTookTime(SearchPhaseName searchPhaseName) {
+ return phaseStatsMap.get(searchPhaseName);
+ }
}
@Override
@@ -332,13 +385,6 @@ public void executeRequest(
SinglePhaseSearchAction phaseSearchAction,
ActionListener listener
) {
- final List searchListenersList = createSearchListenerList();
- final SearchRequestOperationsListener searchRequestOperationsListener;
- if (!CollectionUtils.isEmpty(searchListenersList)) {
- searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
- } else {
- searchRequestOperationsListener = null;
- }
executeRequest(task, searchRequest, new SearchAsyncActionProvider() {
@Override
public AbstractSearchAsyncAction extends SearchPhaseResult> asyncSearchAction(
@@ -355,7 +401,8 @@ public AbstractSearchAsyncAction extends SearchPhaseResult> asyncSearchAction(
ActionListener listener,
boolean preFilter,
ThreadPool threadPool,
- SearchResponse.Clusters clusters
+ SearchResponse.Clusters clusters,
+ SearchRequestOperationsListener searchRequestOperationsListener
) {
return new AbstractSearchAsyncAction(
actionName,
@@ -419,6 +466,16 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
+
+ final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider);
+
+ final SearchRequestOperationsListener searchRequestOperationsListener;
+ if (!CollectionUtils.isEmpty(searchListenersList)) {
+ searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
+ } else {
+ searchRequestOperationsListener = null;
+ }
+
PipelinedRequest searchRequest;
ActionListener listener;
try {
@@ -462,7 +519,8 @@ private void executeRequest(
clusterState,
listener,
searchContext,
- searchAsyncActionProvider
+ searchAsyncActionProvider,
+ searchRequestOperationsListener
);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
@@ -483,7 +541,8 @@ private void executeRequest(
clusterState,
l,
searchContext,
- searchAsyncActionProvider
+ searchAsyncActionProvider,
+ searchRequestOperationsListener
)
);
} else {
@@ -533,7 +592,8 @@ private void executeRequest(
listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
- searchAsyncActionProvider
+ searchAsyncActionProvider,
+ searchRequestOperationsListener
);
}, listener::onFailure)
);
@@ -622,6 +682,7 @@ public void onResponse(SearchResponse searchResponse) {
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
timeProvider.buildTookInMillis(),
+ timeProvider.getPhaseTook(),
searchResponse.getShardFailures(),
new SearchResponse.Clusters(1, 1, 0),
searchResponse.pointInTimeId()
@@ -811,7 +872,8 @@ private void executeLocalSearch(
ClusterState clusterState,
ActionListener listener,
SearchContextId searchContext,
- SearchAsyncActionProvider searchAsyncActionProvider
+ SearchAsyncActionProvider searchAsyncActionProvider,
+ SearchRequestOperationsListener searchRequestOperationsListener
) {
executeSearch(
(SearchTask) task,
@@ -825,7 +887,8 @@ private void executeLocalSearch(
listener,
SearchResponse.Clusters.EMPTY,
searchContext,
- searchAsyncActionProvider
+ searchAsyncActionProvider,
+ searchRequestOperationsListener
);
}
@@ -943,7 +1006,8 @@ private void executeSearch(
ActionListener listener,
SearchResponse.Clusters clusters,
@Nullable SearchContextId searchContext,
- SearchAsyncActionProvider searchAsyncActionProvider
+ SearchAsyncActionProvider searchAsyncActionProvider,
+ SearchRequestOperationsListener searchRequestOperationsListener
) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
@@ -1044,7 +1108,8 @@ private void executeSearch(
listener,
preFilterSearchShards,
threadPool,
- clusters
+ clusters,
+ searchRequestOperationsListener
).start();
}
@@ -1127,15 +1192,30 @@ AbstractSearchAsyncAction extends SearchPhaseResult> asyncSearchAction(
ActionListener listener,
boolean preFilter,
ThreadPool threadPool,
- SearchResponse.Clusters clusters
+ SearchResponse.Clusters clusters,
+ SearchRequestOperationsListener searchRequestOperationsListener
);
}
- private List createSearchListenerList() {
+ private List createSearchListenerList(SearchRequest searchRequest, SearchTimeProvider timeProvider) {
final List searchListenersList = new ArrayList<>();
+
if (isRequestStatsEnabled) {
searchListenersList.add(searchRequestStats);
}
+
+ // phase_took is enabled with request param and/or cluster setting
+ Boolean phaseTookRequestParam = searchRequest.isPhaseTook();
+ if (phaseTookRequestParam == null) { // check cluster setting only when request param is undefined
+ if (clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED)) {
+ timeProvider.setPhaseTook(true);
+ searchListenersList.add(timeProvider);
+ }
+ } else if (phaseTookRequestParam == true) {
+ timeProvider.setPhaseTook(true);
+ searchListenersList.add(timeProvider);
+ }
+
return searchListenersList;
}
@@ -1153,15 +1233,9 @@ private AbstractSearchAsyncAction extends SearchPhaseResult> searchAsyncAction
ActionListener listener,
boolean preFilter,
ThreadPool threadPool,
- SearchResponse.Clusters clusters
+ SearchResponse.Clusters clusters,
+ SearchRequestOperationsListener searchRequestOperationsListener
) {
- final List searchListenersList = createSearchListenerList();
- final SearchRequestOperationsListener searchRequestOperationsListener;
- if (!CollectionUtils.isEmpty(searchListenersList)) {
- searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
- } else {
- searchRequestOperationsListener = null;
- }
if (preFilter) {
return new CanMatchPreFilterSearchPhase(
logger,
@@ -1192,7 +1266,8 @@ private AbstractSearchAsyncAction extends SearchPhaseResult> searchAsyncAction
listener,
false,
threadPool,
- clusters
+ clusters,
+ searchRequestOperationsListener
);
return new SearchPhase(action.getName()) {
@Override
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 4cd3490cffb4c..ad2b89aa3948d 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED,
+ TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
index ebfd082d974fd..080366e536da1 100644
--- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
+++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java
@@ -180,6 +180,12 @@ public static void parseSearchRequest(
searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
}
+ if (request.hasParam("phase_took")) {
+ // only set if we have the parameter passed to override the cluster-level default
+ // else phaseTook = null
+ searchRequest.setPhaseTook(request.paramAsBoolean("phase_took", true));
+ }
+
// do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
// from the REST layer. these modes are an internal optimization and should
// not be specified explicitly by the user.
diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java
index f628bb3201452..edac50813e191 100644
--- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java
+++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java
@@ -688,7 +688,11 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
);
AtomicReference exception = new AtomicReference<>();
ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
-
+ TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
+ 0,
+ System.nanoTime(),
+ System::nanoTime
+ );
return new SearchDfsQueryThenFetchAsyncAction(
logger,
null,
@@ -702,7 +706,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
searchRequest,
listener,
shardsIter,
- null,
+ timeProvider,
null,
task,
SearchResponse.Clusters.EMPTY,
@@ -734,6 +738,11 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
);
AtomicReference exception = new AtomicReference<>();
ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
+ TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
+ 0,
+ System.nanoTime(),
+ System::nanoTime
+ );
return new SearchQueryThenFetchAsyncAction(
logger,
null,
@@ -747,7 +756,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
searchRequest,
listener,
shardsIter,
- null,
+ timeProvider,
null,
task,
SearchResponse.Clusters.EMPTY,
diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java
index 25d8c5551880f..cdd0ea863ce37 100644
--- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java
+++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java
@@ -244,6 +244,7 @@ private SearchRequest mutate(SearchRequest searchRequest) {
);
mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder)));
mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false));
+ mutators.add(() -> mutation.setPhaseTook(searchRequest.isPhaseTook() == false));
mutators.add(
() -> mutation.setCancelAfterTimeInterval(
searchRequest.getCancelAfterTimeInterval() != null
diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java
index 097e922147698..c9e59ab4ea04d 100644
--- a/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java
+++ b/server/src/test/java/org/opensearch/action/search/SearchResponseTests.java
@@ -74,7 +74,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static java.util.Collections.singletonMap;
@@ -152,6 +154,11 @@ public SearchResponse createTestItem(
Boolean terminatedEarly = randomBoolean() ? null : randomBoolean();
int numReducePhases = randomIntBetween(1, 10);
long tookInMillis = randomNonNegativeLong();
+ Map phaseTookMap = new HashMap<>();
+ for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
+ phaseTookMap.put(searchPhaseName.getName(), randomNonNegativeLong());
+ }
+ SearchResponse.PhaseTook phaseTook = new SearchResponse.PhaseTook(phaseTookMap);
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = randomIntBetween(0, totalShards);
@@ -182,6 +189,7 @@ public SearchResponse createTestItem(
successfulShards,
skippedShards,
tookInMillis,
+ phaseTook,
shardSearchFailures,
randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY,
null
@@ -353,6 +361,14 @@ public void testToXContent() {
assertEquals(1, searchExtBuilders.size());
}
{
+ Map phaseTookMap = new HashMap<>();
+ for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
+ phaseTookMap.put(searchPhaseName.getName(), 0L);
+ }
+ phaseTookMap.put(SearchPhaseName.QUERY.getName(), 50L);
+ phaseTookMap.put(SearchPhaseName.FETCH.getName(), 25L);
+ phaseTookMap.put(SearchPhaseName.EXPAND.getName(), 30L);
+ SearchResponse.PhaseTook phaseTook = new SearchResponse.PhaseTook(phaseTookMap);
SearchResponse response = new SearchResponse(
new InternalSearchResponse(
new SearchHits(hits, new TotalHits(100, TotalHits.Relation.EQUAL_TO), 1.5f),
@@ -368,13 +384,24 @@ public void testToXContent() {
0,
0,
0,
+ phaseTook,
ShardSearchFailure.EMPTY_ARRAY,
- new SearchResponse.Clusters(5, 3, 2)
+ new SearchResponse.Clusters(5, 3, 2),
+ null
);
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
expectedString.append("\"took\":0,");
+ expectedString.append("\"phase_took\":");
+ {
+ expectedString.append("{\"dfs_pre_query\":0,");
+ expectedString.append("\"query\":50,");
+ expectedString.append("\"fetch\":25,");
+ expectedString.append("\"dfs_query\":0,");
+ expectedString.append("\"expand\":30,");
+ expectedString.append("\"can_match\":0},");
+ }
expectedString.append("\"timed_out\":false,");
expectedString.append("\"_shards\":");
{
@@ -477,6 +504,24 @@ public void testToXContentEmptyClusters() throws IOException {
assertEquals(0, builder.toString().length());
}
+ public void testSearchResponsePhaseTookEquals() throws IOException {
+ SearchResponse.PhaseTook phaseTookA = new SearchResponse.PhaseTook(Map.of("foo", 0L, "bar", 1L));
+ SearchResponse.PhaseTook phaseTookB = new SearchResponse.PhaseTook(Map.of("foo", 1L, "bar", 1L));
+ SearchResponse.PhaseTook phaseTookC = new SearchResponse.PhaseTook(Map.of("foo", 0L));
+ SearchResponse.PhaseTook phaseTookD = new SearchResponse.PhaseTook(Map.of());
+
+ assertNotEquals(phaseTookA, phaseTookB);
+ assertNotEquals(phaseTookB, phaseTookA);
+ assertNotEquals(phaseTookA, phaseTookC);
+ assertNotEquals(phaseTookC, phaseTookA);
+ assertNotEquals(phaseTookA, phaseTookD);
+ assertNotEquals(phaseTookD, phaseTookA);
+ assertEquals(phaseTookA, phaseTookA);
+ assertEquals(phaseTookB, phaseTookB);
+ assertEquals(phaseTookC, phaseTookC);
+ assertEquals(phaseTookD, phaseTookD);
+ }
+
static class DummySearchExtBuilder extends SearchExtBuilder {
static ParseField DUMMY_FIELD = new ParseField("dummy");
diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java
new file mode 100644
index 0000000000000..f0f1a43e6c21e
--- /dev/null
+++ b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java
@@ -0,0 +1,54 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.search;
+
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SearchTimeProviderTests extends OpenSearchTestCase {
+
+ public void testSearchTimeProviderPhaseFailure() {
+ TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
+ SearchPhaseContext ctx = mock(SearchPhaseContext.class);
+ SearchPhase mockSearchPhase = mock(SearchPhase.class);
+ when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
+
+ for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
+ when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
+ testTimeProvider.onPhaseStart(ctx);
+ assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName));
+ testTimeProvider.onPhaseFailure(ctx);
+ assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName));
+ }
+ }
+
+ public void testSearchTimeProviderPhaseEnd() {
+ TransportSearchAction.SearchTimeProvider testTimeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
+
+ SearchPhaseContext ctx = mock(SearchPhaseContext.class);
+ SearchPhase mockSearchPhase = mock(SearchPhase.class);
+ when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
+
+ for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
+ when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName);
+ long tookTimeInMillis = randomIntBetween(1, 100);
+ testTimeProvider.onPhaseStart(ctx);
+ long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis);
+ when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime);
+ assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName));
+ testTimeProvider.onPhaseEnd(ctx);
+ assertThat(testTimeProvider.getPhaseTookTime(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis));
+ }
+ }
+}
diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java
index b942136e1f1e2..74de1e6d96d93 100644
--- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java
+++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java
@@ -131,6 +131,9 @@ public static SearchRequest randomSearchRequest(Supplier ra
if (randomBoolean()) {
searchRequest.setCancelAfterTimeInterval(TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval"));
}
+ if (randomBoolean()) {
+ searchRequest.setPhaseTook(randomBoolean());
+ }
return searchRequest;
}