Skip to content

Commit

Permalink
Fix tests - we do need to serialize isPartial in exec info
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Dec 27, 2024
1 parent 6b949fe commit 6d41758
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ERROR_TRACE_IN_TRANSPORT_HEADER = def(8_811_00_0);
public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0);
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_814_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;

public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase {

Expand Down Expand Up @@ -322,8 +323,12 @@ public void testStopQuery() throws Exception {
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*"));
assertClusterInfoSuccess(localCluster, localNumShards);
assertThat(localCluster.getIndexExpression(), equalTo("logs-*"));
assertThat(
localCluster.getStatus(),
oneOf(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, EsqlExecutionInfo.Cluster.Status.PARTIAL)
);
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));

assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
private final transient Predicate<String> skipUnavailablePredicate;
private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times
private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute
private transient boolean isPartial; // Does this request have partial results?
private boolean isPartial; // Does this request have partial results?

public EsqlExecutionInfo(boolean includeCCSMetadata) {
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
Expand Down Expand Up @@ -116,6 +116,12 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
this.includeCCSMetadata = false;
}

if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) {
this.isPartial = in.readBoolean();
} else {
this.isPartial = false;
}

this.skipUnavailablePredicate = Predicates.always();
this.relativeStartNanos = null;
}
Expand All @@ -131,6 +137,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeBoolean(includeCCSMetadata);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) {
out.writeBoolean(isPartial);
}
}

public boolean includeCCSMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
task.setExecutionInfo(createEsqlExecutionInfo(request));
// If the request is async, we need to wrap the listener in a SubscribableListener so that we can collect the results from other
// endpoints
// Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from
// other endpoints, such as _query/async/stop
var subListener = new SubscribableListener<EsqlQueryResponse>();
String asyncExecutionId = task.getExecutionId().getEncoded();
// TODO: is runBefore correct here?
Expand Down

0 comments on commit 6d41758

Please sign in to comment.