Skip to content

Commit

Permalink
Took time and cluster details get updated for coordinator only query …
Browse files Browse the repository at this point in the history
…operations (elastic#114075)

* Took time and cluster details get updated for coordinator only query operations

The ComputeService.runCompute pathway for coordinator only operations (such as 
`FROM foo | LIMIT 0` or a ROW command) get updated with overall took time.
This also includes support for cross-cluster coordinator only operations, which come
about with queries like `FROM foo,remote:foo | LIMIT 0`. The _clusters metadata is
now properly updated for those cases as well.

Fixes elastic#114014
  • Loading branch information
quux00 authored Oct 4, 2024
1 parent 95ea135 commit e3705a1
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public void testDoNotLogWithInfo() throws IOException {
setLoggingLevel("INFO");
RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_NOT_LOG_ME = 1");
Map<String, Object> result = runEsql(builder);
assertEquals(2, result.size());
assertEquals(3, result.size());
assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
Map<String, String> colA = Map.of("name", "DO_NOT_LOG_ME", "type", "integer");
assertEquals(List.of(colA), result.get("columns"));
assertEquals(List.of(List.of(1)), result.get("values"));
Expand All @@ -136,7 +137,8 @@ public void testDoLogWithDebug() throws IOException {
setLoggingLevel("DEBUG");
RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_LOG_ME = 1");
Map<String, Object> result = runEsql(builder);
assertEquals(2, result.size());
assertEquals(3, result.size());
assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
Map<String, String> colA = Map.of("name", "DO_LOG_ME", "type", "integer");
assertEquals(List.of(colA), result.get("columns"));
assertEquals(List.of(List.of(1)), result.get("values"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ public static RequestObjectBuilder jsonBuilder() throws IOException {

public void testGetAnswer() throws IOException {
Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
assertEquals(2, answer.size());
assertEquals(3, answer.size());
assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
Map<String, String> colA = Map.of("name", "a", "type", "integer");
Map<String, String> colB = Map.of("name", "b", "type", "integer");
assertEquals(List.of(colA, colB), answer.get("columns"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER = "cluster-a";
Expand Down Expand Up @@ -339,6 +340,108 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
}
}

/**
* Searches with LIMIT 0 are used by Kibana to get a list of columns. After the initial planning
* (which involves cross-cluster field-caps calls), it is a coordinator only operation at query time
* which uses a different pathway compared to queries that require data node (and remote data node) operations
* at query time.
*/
public void testCCSExecutionOnSearchesWithLimit0() {
setupTwoClusters();

// Ensure non-cross cluster queries have overall took time
try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0")) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
}

// ensure cross-cluster searches have overall took time and correct per-cluster details in EsqlExecutionInfo
try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0")) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertNull(remoteCluster.getTotalShards());
assertNull(remoteCluster.getSuccessfulShards());
assertNull(remoteCluster.getSkippedShards());
assertNull(remoteCluster.getFailedShards());

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertNull(localCluster.getTotalShards());
assertNull(localCluster.getSuccessfulShards());
assertNull(localCluster.getSkippedShards());
assertNull(localCluster.getFailedShards());
}

try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0")) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
assertThat(remoteCluster.getIndexExpression(), equalTo("nomatch*"));
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
assertThat(remoteCluster.getTook().millis(), equalTo(0L));
assertThat(remoteCluster.getTotalShards(), equalTo(0));
assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
assertThat(remoteCluster.getSkippedShards(), equalTo(0));
assertThat(remoteCluster.getFailedShards(), equalTo(0));

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertNull(localCluster.getTotalShards());
assertNull(localCluster.getSuccessfulShards());
assertNull(localCluster.getSkippedShards());
assertNull(localCluster.getFailedShards());
}

try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0")) {
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertNotNull(executionInfo);
assertThat(executionInfo.isCrossClusterSearch(), is(true));
long overallTookMillis = executionInfo.overallTook().millis();
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));

EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertNull(remoteCluster.getTotalShards());
assertNull(remoteCluster.getSuccessfulShards());
assertNull(remoteCluster.getSkippedShards());
assertNull(remoteCluster.getFailedShards());

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
// TODO: in https://github.com/elastic/elasticsearch/issues/112886, this will be changed to be SKIPPED
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
}
}

public void testMetadataIndex() {
Map<String, Object> testClusterInfo = setupTwoClusters();
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,21 @@ public void execute(
null,
null
);
String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
try (
var computeListener = ComputeListener.create(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
local,
transportService,
rootTask,
execInfo,
configuration.getQueryStartTimeNanos(),
listener.map(r -> new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo))
listener.map(r -> {
updateExecutionInfoAfterCoordinatorOnlyQuery(configuration.getQueryStartTimeNanos(), execInfo);
return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
})
)
) {
runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute());
runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute(local));
return;
}
} else {
Expand Down Expand Up @@ -247,6 +251,27 @@ public void execute(
}
}

private static void updateExecutionInfoAfterCoordinatorOnlyQuery(long queryStartNanos, EsqlExecutionInfo execInfo) {
long tookTimeNanos = System.nanoTime() - queryStartNanos;
execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS));
if (execInfo.isCrossClusterSearch()) {
for (String clusterAlias : execInfo.clusterAliases()) {
// The local cluster 'took' time gets updated as part of the acquireCompute(local) call in the coordinator, so
// here we only need to update status for remote clusters since there are no remote ComputeListeners in this case.
// This happens in cross cluster searches that use LIMIT 0, e.g, FROM logs*,remote*:logs* | LIMIT 0.
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
execInfo.swapCluster(clusterAlias, (k, v) -> {
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
} else {
return v;
}
});
}
}
}
}

private List<RemoteCluster> getRemoteClusters(
Map<String, OriginalIndices> clusterToConcreteIndices,
Map<String, OriginalIndices> clusterToOriginalIndices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
Expand Down Expand Up @@ -245,6 +246,7 @@ private <T> void preAnalyze(
if (indexResolution.isValid()) {
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
updateTookTimeForRemoteClusters(executionInfo);
Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
indexResolution.get().concreteIndices().toArray(String[]::new)
).keySet();
Expand Down Expand Up @@ -285,6 +287,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
}
Set<String> clustersRequested = executionInfo.clusterAliases();
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters());
/*
* These are clusters in the original request that are not present in the field-caps response. They were
* specified with an index or indices that do not exist, so the search on that cluster is done.
Expand All @@ -304,6 +307,28 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
}
}

private void updateTookTimeForRemoteClusters(EsqlExecutionInfo executionInfo) {
if (executionInfo.isCrossClusterSearch()) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
if (v.getTook() == null && v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
// set took time in case we are finished with the remote cluster (e.g., FROM foo | LIMIT 0).
// this will be overwritten later if ES|QL operations happen on the remote cluster (the typical scenario)
TimeValue took = new TimeValue(
System.nanoTime() - configuration.getQueryStartTimeNanos(),
TimeUnit.NANOSECONDS
);
return new EsqlExecutionInfo.Cluster.Builder(v).setTook(took).build();
} else {
return v;
}
});
}
}
}
}

private void preAnalyzeIndices(
LogicalPlan parsed,
EsqlExecutionInfo executionInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
randomMapping(),
Map.of("logs-a", IndexMode.STANDARD)
);
// mark remote1 as unavailable
IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of(remote1Alias));

EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
Expand All @@ -226,12 +227,10 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
assertThat(remote1Cluster.getTook().millis(), equalTo(0L));
assertThat(remote1Cluster.getTotalShards(), equalTo(0));
assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0));
assertThat(remote1Cluster.getSkippedShards(), equalTo(0));
assertThat(remote1Cluster.getFailedShards(), equalTo(0));
// remote1 is left as RUNNING, since another method (updateExecutionInfoWithUnavailableClusters) not under test changes status
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
assertNull(remote1Cluster.getTook());
assertNull(remote1Cluster.getTotalShards());

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));
Expand Down

0 comments on commit e3705a1

Please sign in to comment.