From 6d41758480d8521826bb344ea24c68490e59d0f9 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 27 Dec 2024 13:23:16 -0700 Subject: [PATCH] Fix tests - we do need to serialize isPartial in exec info --- .../java/org/elasticsearch/TransportVersions.java | 1 + .../xpack/esql/action/CrossClusterAsyncQueryIT.java | 9 +++++++-- .../xpack/esql/action/EsqlExecutionInfo.java | 11 ++++++++++- .../xpack/esql/plugin/TransportEsqlQueryAction.java | 4 ++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 3495908da7eeb..fbb26c4197282 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -146,6 +146,7 @@ static TransportVersion def(int id) { public static final TransportVersion ERROR_TRACE_IN_TRANSPORT_HEADER = def(8_811_00_0); public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0); public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0); + public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_814_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java index 204f0f926269e..f9200ba14afb8 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -50,6 +50,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.oneOf; public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase { @@ -322,8 +323,12 @@ public void testStopQuery() throws Exception { assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(localCluster, localNumShards); + assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertThat( + localCluster.getStatus(), + oneOf(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, EsqlExecutionInfo.Cluster.Status.PARTIAL) + ); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); } finally { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index e8c6c5d0d620c..049c2d64411de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -72,7 +72,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { private final transient Predicate skipUnavailablePredicate; private final transient Long relativeStartNanos; // start time for an ESQL query for calculating took times private transient TimeValue planningTookTime; // time elapsed since start of query to calling ComputeService.execute - private transient boolean isPartial; // Does this request have partial results? + private boolean isPartial; // Does this request have partial results? public EsqlExecutionInfo(boolean includeCCSMetadata) { this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true @@ -116,6 +116,12 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { this.includeCCSMetadata = false; } + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) { + this.isPartial = in.readBoolean(); + } else { + this.isPartial = false; + } + this.skipUnavailablePredicate = Predicates.always(); this.relativeStartNanos = null; } @@ -131,6 +137,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeBoolean(includeCCSMetadata); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL)) { + out.writeBoolean(isPartial); + } } public boolean includeCCSMetadata() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 4f7df38c786db..d679c0478dfa2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -162,8 +162,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener listener) { // set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running task.setExecutionInfo(createEsqlExecutionInfo(request)); - // If the request is async, we need to wrap the listener in a SubscribableListener so that we can collect the results from other - // endpoints + // Since the request is async here, we need to wrap the listener in a SubscribableListener so that we can collect the results from + // other endpoints, such as _query/async/stop var subListener = new SubscribableListener(); String asyncExecutionId = task.getExecutionId().getEncoded(); // TODO: is runBefore correct here?