Skip to content

Commit

Permalink
Request level latency tracking
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Sep 12, 2023
1 parent f44fb3c commit 6a933d7
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
if (timeProvider.isPhaseTookEnabled()) {
searchListenersList.add(timeProvider);
}
if (!CollectionUtils.isEmpty(searchListenersList)) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);
Expand Down Expand Up @@ -333,6 +336,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -791,6 +795,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private String pipeline;

private Boolean phaseTookQueryParamEnabled = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTookQueryParamEnabled = searchRequest.phaseTookQueryParamEnabled;
}

/**
Expand Down Expand Up @@ -253,6 +256,7 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
phaseTookQueryParamEnabled = in.readOptionalBoolean();
}

@Override
Expand Down Expand Up @@ -284,6 +288,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
out.writeOptionalBoolean(phaseTookQueryParamEnabled);
}

@Override
Expand Down Expand Up @@ -615,6 +620,33 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}

enum ParamValue {
TRUE,
FALSE,
UNSET
}

/**
* Returns value of user-provided phase_took query parameter for this search request.
* Defaults to <code>false</code>.
*/
public ParamValue isPhaseTookQueryParamEnabled() {
if (phaseTookQueryParamEnabled == null) {
return ParamValue.UNSET;
} else if (phaseTookQueryParamEnabled == true) {
return ParamValue.TRUE;
} else {
return ParamValue.FALSE;
}
}

/**
* Sets value of phase_took query param if provided by user. Defaults to <code>null</code>.
*/
public void setPhaseTookQueryParamEnabled(boolean phaseTookQueryParamEnabled) {
this.phaseTookQueryParamEnabled = phaseTookQueryParamEnabled;
}

/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
Expand Down Expand Up @@ -719,7 +751,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline);
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTookQueryParamEnabled, that.phaseTookQueryParamEnabled);
}

@Override
Expand All @@ -740,7 +773,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval
cancelAfterTimeInterval,
phaseTookQueryParamEnabled
);
}

Expand Down Expand Up @@ -783,6 +817,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ ", phaseTookQueryParamEnabled="
+ phaseTookQueryParamEnabled
+ "}";
}
}
176 changes: 175 additions & 1 deletion server/src/main/java/org/opensearch/action/search/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private final PhaseTook phaseTook;

public SearchResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -112,6 +113,7 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
phaseTook = new PhaseTook(in);
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
Expand All @@ -123,10 +125,22 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
this(
internalResponse,
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
shardFailures,
clusters,
null
);
}

public SearchResponse(
Expand All @@ -136,6 +150,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
Expand All @@ -148,6 +163,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
Expand Down Expand Up @@ -210,6 +226,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

/**
* How long the request took in each search phase.
*/
public PhaseTook getPhaseTook() {
return phaseTook;
}

/**
* The total number of shards the search was executed on.
*/
Expand Down Expand Up @@ -298,6 +321,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
if (phaseTook.equals(PhaseTook.NULL) == false) {
phaseTook.toXContent(builder, params);
}
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
Expand Down Expand Up @@ -337,6 +363,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
PhaseTook phaseTook = PhaseTook.NULL;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
Expand Down Expand Up @@ -401,6 +428,35 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
} else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
long dfsPreQueryTotal = -1;
long canMatchTotal = -1;
long queryTotal = -1;
long fetchTotal = -1;
long expandSearchTotal = -1;

while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (PhaseTook.DFS_PREQUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
dfsPreQueryTotal = parser.longValue(); // we don't need it but need to consume it
} else if (PhaseTook.CAN_MATCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
canMatchTotal = parser.longValue();
} else if (PhaseTook.QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
queryTotal = parser.longValue();
} else if (PhaseTook.FETCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
fetchTotal = parser.longValue();
} else if (PhaseTook.EXPAND_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
expandSearchTotal = parser.longValue();
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
phaseTook = new PhaseTook(dfsPreQueryTotal, canMatchTotal, queryTotal, fetchTotal, expandSearchTotal);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
Expand Down Expand Up @@ -472,6 +528,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
Expand All @@ -491,6 +548,10 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
phaseTook.writeTo(out);
// if (phaseTook != PhaseTook.NULL) {
// phaseTook.writeTo(out);
// }
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
Expand Down Expand Up @@ -604,6 +665,118 @@ public String toString() {
}
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*
* @opensearch.internal
*/
public static class PhaseTook implements ToXContentFragment, Writeable {
public static final PhaseTook NULL = new PhaseTook(-1, -1, -1, -1, -1);

static final ParseField PHASE_TOOK = new ParseField("phase_took");
static final ParseField DFS_PREQUERY_FIELD = new ParseField("dfs_prequery");
static final ParseField CAN_MATCH_FIELD = new ParseField("can_match");
static final ParseField QUERY_FIELD = new ParseField("query");
static final ParseField FETCH_FIELD = new ParseField("fetch");
static final ParseField EXPAND_FIELD = new ParseField("expand_search");

private final long dfsPreQueryTotal;
private final long canMatchTotal;
private final long queryTotal;
private final long fetchTotal;
private final long expandSearchTotal;

public PhaseTook(long dfsPreQueryTotal, long canMatchTotal, long queryTotal, long fetchTotal, long expandSearchTotal) {
this.dfsPreQueryTotal = dfsPreQueryTotal;
this.canMatchTotal = canMatchTotal;
this.queryTotal = queryTotal;
this.fetchTotal = fetchTotal;
this.expandSearchTotal = expandSearchTotal;
}

private PhaseTook(StreamInput in) throws IOException {
this(in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(dfsPreQueryTotal);
out.writeLong(canMatchTotal);
out.writeLong(queryTotal);
out.writeLong(fetchTotal);
out.writeLong(expandSearchTotal);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(PHASE_TOOK.getPreferredName());
builder.field(DFS_PREQUERY_FIELD.getPreferredName(), dfsPreQueryTotal);
builder.field(CAN_MATCH_FIELD.getPreferredName(), canMatchTotal);
builder.field(QUERY_FIELD.getPreferredName(), queryTotal);
builder.field(FETCH_FIELD.getPreferredName(), fetchTotal);
builder.field(EXPAND_FIELD.getPreferredName(), expandSearchTotal);
builder.endObject();
return builder;
}

/**
* Returns time spent in DFS Prequery phase during the execution of the search request
*/
public long getDfsPreQueryTotal() {
return dfsPreQueryTotal;
}

/**
* Returns time spent in canMatch phase during the execution of the search request
*/
public long getCanMatchTotal() {
return canMatchTotal;
}

/**
* Returns time spent in query phase during the execution of the search request
*/
public long getQueryTotal() {
return queryTotal;
}

/**
* Returns time spent in fetch phase during the execution of the search request
*/
public long getFetchTotal() {
return fetchTotal;
}

/**
* Returns time spent in expand phase during the execution of the search request
*/
public long getExpandSearchTotal() {
return expandSearchTotal;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhaseTook phaseTook = (PhaseTook) o;
return dfsPreQueryTotal == phaseTook.dfsPreQueryTotal
&& queryTotal == phaseTook.queryTotal
&& canMatchTotal == phaseTook.canMatchTotal
&& fetchTotal == phaseTook.fetchTotal
&& expandSearchTotal == phaseTook.expandSearchTotal;
}

@Override
public int hashCode() {
return Objects.hash(dfsPreQueryTotal, queryTotal, canMatchTotal, fetchTotal, expandSearchTotal);
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
Expand All @@ -622,6 +795,7 @@ static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters cluste
0,
0,
tookInMillisSupplier.get(),
PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading

0 comments on commit 6a933d7

Please sign in to comment.