Skip to content

Commit

Permalink
Per request phase latency (#10351)
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 authored Oct 13, 2023
1 parent 25f2b76 commit daf1350
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -662,6 +663,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ public static void readMultiLineFormat(
} else if ("cancel_after_time_interval".equals(entry.getKey())
|| "cancelAfterTimeInterval".equals(entry.getKey())) {
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
} else if ("phase_took".equals(entry.getKey())) {
searchRequest.setPhaseTook(nodeBooleanValue(value));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
Expand Down Expand Up @@ -374,6 +376,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.getCancelAfterTimeInterval() != null) {
xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep());
}
if (request.isPhaseTook() != null) {
xContentBuilder.field("phase_took", request.isPhaseTook());
}
xContentBuilder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private String pipeline;

private Boolean phaseTook = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -209,6 +211,7 @@ private SearchRequest(
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
}

/**
Expand Down Expand Up @@ -253,6 +256,9 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
pipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalBoolean();
}
}

@Override
Expand Down Expand Up @@ -284,6 +290,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalString(pipeline);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(phaseTook);
}
}

@Override
Expand Down Expand Up @@ -615,6 +624,20 @@ public void setPreFilterShardSize(int preFilterShardSize) {
this.preFilterShardSize = preFilterShardSize;
}

/**
* Returns value of user-provided phase_took query parameter for this search request.
*/
public Boolean isPhaseTook() {
return phaseTook;
}

/**
* Sets value of phase_took query param if provided by user. Defaults to <code>null</code>.
*/
public void setPhaseTook(Boolean phaseTook) {
this.phaseTook = phaseTook;
}

/**
* Returns a threshold that enforces a pre-filter roundtrip to pre-filter search shards based on query rewriting if the number of shards
* the search request expands to exceeds the threshold, or <code>null</code> if the threshold is unspecified.
Expand Down Expand Up @@ -719,7 +742,8 @@ public boolean equals(Object o) {
&& absoluteStartMillis == that.absoluteStartMillis
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline);
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
}

@Override
Expand All @@ -740,7 +764,8 @@ public int hashCode() {
localClusterAlias,
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval
cancelAfterTimeInterval,
phaseTook
);
}

Expand Down Expand Up @@ -783,6 +808,8 @@ public String toString() {
+ cancelAfterTimeInterval
+ ", pipeline="
+ pipeline
+ ", phaseTook="
+ phaseTook
+ "}";
}
}
132 changes: 131 additions & 1 deletion server/src/main/java/org/opensearch/action/search/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.search;

import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.StatusToXContentObject;
Expand Down Expand Up @@ -63,7 +64,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,6 +97,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private final PhaseTook phaseTook;

public SearchResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -112,6 +116,11 @@ public SearchResponse(StreamInput in) throws IOException {
clusters = new Clusters(in);
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
phaseTook = in.readOptionalWriteable(PhaseTook::new);
} else {
phaseTook = null;
}
skippedShards = in.readVInt();
pointInTimeId = in.readOptionalString();
}
Expand All @@ -126,7 +135,32 @@ public SearchResponse(
ShardSearchFailure[] shardFailures,
Clusters clusters
) {
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, shardFailures, clusters, null);
this(internalResponse, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, null, shardFailures, clusters, null);
}

public SearchResponse(
SearchResponseSections internalResponse,
String scrollId,
int totalShards,
int successfulShards,
int skippedShards,
long tookInMillis,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
) {
this(
internalResponse,
scrollId,
totalShards,
successfulShards,
skippedShards,
tookInMillis,
null,
shardFailures,
clusters,
pointInTimeId
);
}

public SearchResponse(
Expand All @@ -136,6 +170,7 @@ public SearchResponse(
int successfulShards,
int skippedShards,
long tookInMillis,
PhaseTook phaseTook,
ShardSearchFailure[] shardFailures,
Clusters clusters,
String pointInTimeId
Expand All @@ -148,6 +183,7 @@ public SearchResponse(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.tookInMillis = tookInMillis;
this.phaseTook = phaseTook;
this.shardFailures = shardFailures;
assert skippedShards <= totalShards : "skipped: " + skippedShards + " total: " + totalShards;
assert scrollId == null || pointInTimeId == null : "SearchResponse can't have both scrollId ["
Expand Down Expand Up @@ -210,6 +246,13 @@ public TimeValue getTook() {
return new TimeValue(tookInMillis);
}

/**
* How long the request took in each search phase.
*/
public PhaseTook getPhaseTook() {
return phaseTook;
}

/**
* The total number of shards the search was executed on.
*/
Expand Down Expand Up @@ -298,6 +341,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
}
builder.field(TOOK.getPreferredName(), tookInMillis);
if (phaseTook != null) {
phaseTook.toXContent(builder, params);
}
builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
if (isTerminatedEarly() != null) {
builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
Expand Down Expand Up @@ -337,6 +383,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
Boolean terminatedEarly = null;
int numReducePhases = 1;
long tookInMillis = -1;
PhaseTook phaseTook = null;
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0; // 0 for BWC
Expand Down Expand Up @@ -401,6 +448,24 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
parser.skipChildren();
}
}
} else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) {
Map<String, Long> phaseTookMap = new HashMap<>();

while ((token = parser.nextToken()) != Token.END_OBJECT) {
if (token == Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
try {
SearchPhaseName.valueOf(currentFieldName.toUpperCase(Locale.ROOT));
phaseTookMap.put(currentFieldName, parser.longValue());
} catch (final IllegalArgumentException ex) {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
phaseTook = new PhaseTook(phaseTookMap);
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
int successful = -1;
int total = -1;
Expand Down Expand Up @@ -472,6 +537,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
clusters,
searchContextId
Expand All @@ -491,6 +557,9 @@ public void writeTo(StreamOutput out) throws IOException {
clusters.writeTo(out);
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(phaseTook);
}
out.writeVInt(skippedShards);
out.writeOptionalString(pointInTimeId);
}
Expand Down Expand Up @@ -604,6 +673,67 @@ public String toString() {
}
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*
* @opensearch.internal
*/
public static class PhaseTook implements ToXContentFragment, Writeable {
static final ParseField PHASE_TOOK = new ParseField("phase_took");
private final Map<String, Long> phaseTookMap;

public PhaseTook(Map<String, Long> phaseTookMap) {
this.phaseTookMap = phaseTookMap;
}

private PhaseTook(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, StreamInput::readLong));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(phaseTookMap, StreamOutput::writeString, StreamOutput::writeLong);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(PHASE_TOOK.getPreferredName());

for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
if (phaseTookMap.containsKey(searchPhaseName.getName())) {
builder.field(searchPhaseName.getName(), phaseTookMap.get(searchPhaseName.getName()));
} else {
builder.field(searchPhaseName.getName(), 0);
}
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PhaseTook phaseTook = (PhaseTook) o;

if (phaseTook.phaseTookMap.equals(phaseTookMap)) {
return true;
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(phaseTookMap);
}
}

static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading

0 comments on commit daf1350

Please sign in to comment.