From baae609b91ce756729bf47d08bc4be90d916d0b8 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 2 Jan 2025 14:05:21 -0700 Subject: [PATCH] Add ESQL telemetry collection (#119474) * Add ESQL telemetry collection (cherry picked from commit 0292905ef6926595cca79e0cd67b720cc5ced89e) # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java --- docs/changelog/119474.yaml | 5 + docs/reference/cluster/stats.asciidoc | 7 +- .../search/ccs/CCSUsageTelemetryIT.java | 56 +---- .../org/elasticsearch/TransportVersions.java | 2 + .../cluster/stats/CCSTelemetrySnapshot.java | 58 +++-- .../action/admin/cluster/stats/CCSUsage.java | 15 ++ .../cluster/stats/CCSUsageTelemetry.java | 20 +- .../stats/ClusterStatsNodeResponse.java | 31 ++- .../cluster/stats/ClusterStatsResponse.java | 19 +- .../stats/TransportClusterStatsAction.java | 6 +- .../action/search/TransportSearchAction.java | 12 +- .../admin/cluster/RestClusterStatsAction.java | 3 +- .../org/elasticsearch/usage/UsageService.java | 6 + .../stats/CCSTelemetrySnapshotTests.java | 16 ++ .../cluster/stats/CCSUsageTelemetryTests.java | 19 ++ .../cluster/stats/VersionStatsTests.java | 1 + .../admin/cluster/stats/telemetry_test.json | 4 +- .../test/SkipUnavailableRule.java | 60 +++++ .../xpack/esql/ccq/MultiClustersIT.java | 35 +++ ...AbstractCrossClustersUsageTelemetryIT.java | 205 ++++++++++++++++ .../action/CrossClustersUsageTelemetryIT.java | 231 ++++++++++++++++++ ...rossClustersUsageTelemetryNoLicenseIT.java | 42 ++++ .../xpack/esql/action/EsqlExecutionInfo.java | 4 + .../xpack/esql/execution/PlanExecutor.java | 8 +- .../esql/plugin/TransportEsqlQueryAction.java | 72 +++++- .../xpack/esql/session/EsqlSession.java | 2 +- .../esql/session/EsqlSessionCCSUtils.java | 12 + .../session/EsqlSessionCCSUtilsTests.java | 40 +-- 28 files changed, 865 insertions(+), 126 deletions(-) create mode 100644 docs/changelog/119474.yaml create mode 100644 test/framework/src/main/java/org/elasticsearch/test/SkipUnavailableRule.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java diff --git a/docs/changelog/119474.yaml b/docs/changelog/119474.yaml new file mode 100644 index 0000000000000..e37561277d220 --- /dev/null +++ b/docs/changelog/119474.yaml @@ -0,0 +1,5 @@ +pr: 119474 +summary: "Add ES|QL cross-cluster query telemetry collection" +area: ES|QL +type: enhancement +issues: [] diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index 4a7a54a5b290d..f078fd2b7f2ee 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -25,7 +25,6 @@ Returns cluster statistics. * If the {es} {security-features} are enabled, you must have the `monitor` or `manage` <> to use this API. - [[cluster-stats-api-desc]] ==== {api-description-title} @@ -1397,7 +1396,7 @@ as a human-readable string. `_search`::: -(object) Contains the information about the <> usage in the cluster. +(object) Contains information about <> usage. + .Properties of `_search` [%collapsible%open] @@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests ======= + ====== +`_esql`::: +(object) Contains information about <> usage. +The structure of the object is the same as the `_search` object above. ===== diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java index c9d34dbf14015..0706894fbef9f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CCSUsageTelemetryIT.java @@ -37,18 +37,12 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.SkipUnavailableRule; +import org.elasticsearch.test.SkipUnavailableRule.NotSkipped; import org.elasticsearch.usage.UsageService; import org.junit.Assert; import org.junit.Rule; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.util.Arrays; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -56,8 +50,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE; import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE; @@ -498,7 +490,7 @@ public void testRemoteOnlyTimesOut() throws Exception { assertThat(perCluster.get(REMOTE2), equalTo(null)); } - @SkipOverride(aliases = { REMOTE1 }) + @NotSkipped(aliases = { REMOTE1 }) public void testRemoteTimesOutFailure() throws Exception { Map testClusterInfo = setupClusters(); String remoteIndex = (String) testClusterInfo.get("remote.index"); @@ -528,7 +520,7 @@ public void testRemoteTimesOutFailure() throws Exception { /** * Search when all the remotes failed and not skipped */ - @SkipOverride(aliases = { REMOTE1, REMOTE2 }) + @NotSkipped(aliases = { REMOTE1, REMOTE2 }) public void testFailedAllRemotesSearch() throws Exception { Map testClusterInfo = setupClusters(); String localIndex = (String) testClusterInfo.get("local.index"); @@ -577,7 +569,7 @@ public void testRemoteHasNoIndex() throws Exception { /** * Test that we're still counting remote search even if remote cluster has no such index */ - @SkipOverride(aliases = { REMOTE1 }) + @NotSkipped(aliases = { REMOTE1 }) public void testRemoteHasNoIndexFailure() throws Exception { SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index"); CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest); @@ -689,40 +681,4 @@ private int indexDocs(Client client, String index) { return numDocs; } - /** - * Annotation to mark specific cluster in a test as not to be skipped when unavailable - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - @interface SkipOverride { - String[] aliases(); - } - - /** - * Test rule to process skip annotations - */ - static class SkipUnavailableRule implements TestRule { - private final Map skipMap; - - SkipUnavailableRule(String... clusterAliases) { - this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true)); - } - - public Map getMap() { - return skipMap; - } - - @Override - public Statement apply(Statement base, Description description) { - // Check for annotation named "SkipOverride" and set the overrides accordingly - var aliases = description.getAnnotation(SkipOverride.class); - if (aliases != null) { - for (String alias : aliases.aliases()) { - skipMap.put(alias, false); - } - } - return base; - } - - } } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 4e9e8660db73c..5969bdf97b5f4 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -152,6 +152,8 @@ static TransportVersion def(int id) { public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0); public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0); public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0); + public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0); + public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java index 3bbaa80ec200e..8500302e4f755 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshot.java @@ -41,7 +41,6 @@ *
*/ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment { - public static final String CCS_TELEMETRY_FIELD_NAME = "_search"; private long totalCount; private long successCount; private final Map failureReasons; @@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment private final Map clientCounts; private final Map byRemoteCluster; + // Whether we should use per-MRT (minimize roundtrips) metrics. + // ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage. + private boolean useMRT = true; /** * Creates a new stats instance with the provided info. @@ -191,6 +193,11 @@ public Map getByRemoteCluster() { return Collections.unmodifiableMap(byRemoteCluster); } + public CCSTelemetrySnapshot setUseMRT(boolean useMRT) { + this.useMRT = useMRT; + return this; + } + public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment { private long count; private long skippedCount; @@ -270,6 +277,11 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(count, skippedCount, took); } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } } /** @@ -291,8 +303,10 @@ public void add(CCSTelemetrySnapshot stats) { stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum)); stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum)); took.add(stats.took); - tookMrtTrue.add(stats.tookMrtTrue); - tookMrtFalse.add(stats.tookMrtFalse); + if (useMRT) { + tookMrtTrue.add(stats.tookMrtTrue); + tookMrtFalse.add(stats.tookMrtFalse); + } remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax); if (totalCount > 0 && oldCount > 0) { // Weighted average @@ -328,30 +342,28 @@ private static void publishLatency(XContentBuilder builder, String name, LongMet @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(CCS_TELEMETRY_FIELD_NAME); - { - builder.field("total", totalCount); - builder.field("success", successCount); - builder.field("skipped", skippedRemotes); - publishLatency(builder, "took", took); + builder.field("total", totalCount); + builder.field("success", successCount); + builder.field("skipped", skippedRemotes); + publishLatency(builder, "took", took); + if (useMRT) { publishLatency(builder, "took_mrt_true", tookMrtTrue); publishLatency(builder, "took_mrt_false", tookMrtFalse); - builder.field("remotes_per_search_max", remotesPerSearchMax); - builder.field("remotes_per_search_avg", remotesPerSearchAvg); - builder.field("failure_reasons", failureReasons); - builder.field("features", featureCounts); - builder.field("clients", clientCounts); - builder.startObject("clusters"); - { - for (var entry : byRemoteCluster.entrySet()) { - String remoteName = entry.getKey(); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) { - remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION; - } - builder.field(remoteName, entry.getValue()); + } + builder.field("remotes_per_search_max", remotesPerSearchMax); + builder.field("remotes_per_search_avg", remotesPerSearchAvg); + builder.field("failure_reasons", failureReasons); + builder.field("features", featureCounts); + builder.field("clients", clientCounts); + builder.startObject("clusters"); + { + for (var entry : byRemoteCluster.entrySet()) { + String remoteName = entry.getKey(); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) { + remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION; } + builder.field(remoteName, entry.getValue()); } - builder.endObject(); } builder.endObject(); return builder; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java index 9e58d6d8febef..29a7dcb5d07d8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsage.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ShardOperationFailedException; @@ -20,6 +21,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.query.SearchTimeoutException; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import java.util.Arrays; @@ -84,6 +86,15 @@ public Builder setClient(String client) { return this; } + public Builder setClientFromTask(Task task) { + String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER); + if (client != null) { + return setClient(client); + } else { + return this; + } + } + public Builder skippedRemote(String remote) { this.skippedRemotes.add(remote); return this; @@ -133,6 +144,10 @@ public static Result getFailureType(Exception e) { if (ExceptionsHelper.unwrapCorruption(e) != null) { return Result.CORRUPTION; } + ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class); + if (se != null && se.getDetailedMessage().contains("license")) { + return Result.LICENSE; + } // This is kind of last resort check - if we still don't know the reason but all shard failures are remote, // we assume it's remote's fault somehow. if (e instanceof SearchPhaseExecutionException spe) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java index 6c8178282d3c3..3f04eceed7eb5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetry.java @@ -47,6 +47,7 @@ public enum Result { TIMEOUT("timeout"), CORRUPTION("corruption"), SECURITY("security"), + LICENSE("license"), // May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients. UNKNOWN("other"); @@ -106,8 +107,14 @@ public String getName() { private final Map clientCounts; private final Map byRemoteCluster; + // Should we calculate separate metrics per MRT? + private final boolean useMRT; public CCSUsageTelemetry() { + this(true); + } + + public CCSUsageTelemetry(boolean useMRT) { this.byRemoteCluster = new ConcurrentHashMap<>(); totalCount = new LongAdder(); successCount = new LongAdder(); @@ -119,6 +126,7 @@ public CCSUsageTelemetry() { skippedRemotes = new LongAdder(); featureCounts = new ConcurrentHashMap<>(); clientCounts = new ConcurrentHashMap<>(); + this.useMRT = useMRT; } public void updateUsage(CCSUsage ccsUsage) { @@ -134,10 +142,12 @@ private void doUpdate(CCSUsage ccsUsage) { if (isSuccess(ccsUsage)) { successCount.increment(); took.record(searchTook); - if (isMRT(ccsUsage)) { - tookMrtTrue.record(searchTook); - } else { - tookMrtFalse.record(searchTook); + if (useMRT) { + if (isMRT(ccsUsage)) { + tookMrtTrue.record(searchTook); + } else { + tookMrtFalse.record(searchTook); + } } ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u)); } else { @@ -243,6 +253,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() { Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)), Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)), Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot)) - ); + ).setUseMRT(useMRT); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java index abeb73e5d8c3e..48b4e967742cd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java @@ -31,7 +31,8 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse { private final ClusterHealthStatus clusterStatus; private final SearchUsageStats searchUsageStats; private final RepositoryUsageStats repositoryUsageStats; - private final CCSTelemetrySnapshot ccsMetrics; + private final CCSTelemetrySnapshot searchCcsMetrics; + private final CCSTelemetrySnapshot esqlCcsMetrics; public ClusterStatsNodeResponse(StreamInput in) throws IOException { super(in); @@ -46,10 +47,15 @@ public ClusterStatsNodeResponse(StreamInput in) throws IOException { } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { repositoryUsageStats = RepositoryUsageStats.readFrom(in); - ccsMetrics = new CCSTelemetrySnapshot(in); + searchCcsMetrics = new CCSTelemetrySnapshot(in); } else { repositoryUsageStats = RepositoryUsageStats.EMPTY; - ccsMetrics = new CCSTelemetrySnapshot(); + searchCcsMetrics = new CCSTelemetrySnapshot(); + } + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_TELEMETRY_STATS)) { + esqlCcsMetrics = new CCSTelemetrySnapshot(in); + } else { + esqlCcsMetrics = new CCSTelemetrySnapshot(); } } @@ -61,7 +67,8 @@ public ClusterStatsNodeResponse( ShardStats[] shardsStats, SearchUsageStats searchUsageStats, RepositoryUsageStats repositoryUsageStats, - CCSTelemetrySnapshot ccsTelemetrySnapshot + CCSTelemetrySnapshot ccsTelemetrySnapshot, + CCSTelemetrySnapshot esqlTelemetrySnapshot ) { super(node); this.nodeInfo = nodeInfo; @@ -70,7 +77,8 @@ public ClusterStatsNodeResponse( this.clusterStatus = clusterStatus; this.searchUsageStats = Objects.requireNonNull(searchUsageStats); this.repositoryUsageStats = Objects.requireNonNull(repositoryUsageStats); - this.ccsMetrics = ccsTelemetrySnapshot; + this.searchCcsMetrics = ccsTelemetrySnapshot; + this.esqlCcsMetrics = esqlTelemetrySnapshot; } public NodeInfo nodeInfo() { @@ -101,8 +109,12 @@ public RepositoryUsageStats repositoryUsageStats() { return repositoryUsageStats; } - public CCSTelemetrySnapshot getCcsMetrics() { - return ccsMetrics; + public CCSTelemetrySnapshot getSearchCcsMetrics() { + return searchCcsMetrics; + } + + public CCSTelemetrySnapshot getEsqlCcsMetrics() { + return esqlCcsMetrics; } @Override @@ -117,8 +129,11 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { repositoryUsageStats.writeTo(out); - ccsMetrics.writeTo(out); + searchCcsMetrics.writeTo(out); } // else just drop these stats, ok for bwc + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_TELEMETRY_STATS)) { + esqlCcsMetrics.writeTo(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 5f7c45c5807a5..ed8ca2f94a78b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -36,10 +36,14 @@ public class ClusterStatsResponse extends BaseNodesResponse remoteClustersStats; + public static final String CCS_TELEMETRY_FIELD_NAME = "_search"; + public static final String ESQL_TELEMETRY_FIELD_NAME = "_esql"; + public ClusterStatsResponse( long timestamp, String clusterUUID, @@ -58,6 +62,7 @@ public ClusterStatsResponse( nodesStats = new ClusterStatsNodes(nodes); indicesStats = new ClusterStatsIndices(nodes, mappingStats, analysisStats, versionStats); ccsMetrics = new CCSTelemetrySnapshot(); + esqlMetrics = new CCSTelemetrySnapshot().setUseMRT(false); ClusterHealthStatus status = null; for (ClusterStatsNodeResponse response : nodes) { // only the master node populates the status @@ -66,7 +71,10 @@ public ClusterStatsResponse( break; } } - nodes.forEach(node -> ccsMetrics.add(node.getCcsMetrics())); + nodes.forEach(node -> { + ccsMetrics.add(node.getSearchCcsMetrics()); + esqlMetrics.add(node.getEsqlCcsMetrics()); + }); this.status = status; this.clusterSnapshotStats = clusterSnapshotStats; @@ -147,9 +155,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (remoteClustersStats != null) { builder.field("clusters", remoteClustersStats); } + builder.startObject(CCS_TELEMETRY_FIELD_NAME); ccsMetrics.toXContent(builder, params); builder.endObject(); + if (esqlMetrics.getTotalCount() > 0) { + builder.startObject(ESQL_TELEMETRY_FIELD_NAME); + esqlMetrics.toXContent(builder, params); + builder.endObject(); + } + + builder.endObject(); + return builder; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index fa201b6130579..f074a6ed94b87 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -105,6 +105,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final RepositoriesService repositoriesService; private final SearchUsageHolder searchUsageHolder; private final CCSUsageTelemetry ccsUsageHolder; + private final CCSUsageTelemetry esqlUsageHolder; private final Executor clusterStateStatsExecutor; private final MetadataStatsCache mappingStatsCache; @@ -137,6 +138,7 @@ public TransportClusterStatsAction( this.repositoriesService = repositoriesService; this.searchUsageHolder = usageService.getSearchUsageHolder(); this.ccsUsageHolder = usageService.getCcsUsageHolder(); + this.esqlUsageHolder = usageService.getEsqlUsageHolder(); this.clusterStateStatsExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); @@ -295,6 +297,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats(); final CCSTelemetrySnapshot ccsTelemetry = ccsUsageHolder.getCCSTelemetrySnapshot(); + final CCSTelemetrySnapshot esqlTelemetry = esqlUsageHolder.getCCSTelemetrySnapshot(); return new ClusterStatsNodeResponse( nodeInfo.getNode(), @@ -304,7 +307,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq shardsStats.toArray(new ShardStats[shardsStats.size()]), searchUsageStats, repositoryUsageStats, - ccsTelemetry + ccsTelemetry, + esqlTelemetry ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4f00f6d903704..c9985bb58d4bf 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -388,10 +388,7 @@ void executeRequest( if (original.pointInTimeBuilder() != null) { tl.setFeature(CCSUsageTelemetry.PIT_FEATURE); } - String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER); - if (client != null) { - tl.setClient(client); - } + tl.setClient(task); // Check if any of the index patterns are wildcard patterns var localIndices = resolvedIndices.getLocalIndices(); if (localIndices != null && Arrays.stream(localIndices.indices()).anyMatch(Regex::isSimpleMatchPattern)) { @@ -508,6 +505,7 @@ void executeRequest( } } }); + final SearchSourceBuilder source = original.source(); if (shouldOpenPIT(source)) { // disabling shard reordering for request @@ -1882,7 +1880,7 @@ private interface TelemetryListener { void setFeature(String feature); - void setClient(String client); + void setClient(Task task); } private class SearchResponseActionListener implements ActionListener, TelemetryListener { @@ -1917,8 +1915,8 @@ public void setFeature(String feature) { } @Override - public void setClient(String client) { - usageBuilder.setClient(client); + public void setClient(Task task) { + usageBuilder.setClientFromTask(task); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java index b8405f269344d..c20ee974e2723 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -33,7 +33,8 @@ public class RestClusterStatsAction extends BaseRestHandler { "human-readable-total-docs-size", "verbose-dense-vector-mapping-stats", "ccs-stats", - "retrievers-usage-stats" + "retrievers-usage-stats", + "esql-stats" ); private static final Set SUPPORTED_QUERY_PARAMETERS = Set.of("include_remotes", "nodeId", REST_TIMEOUT_PARAM); diff --git a/server/src/main/java/org/elasticsearch/usage/UsageService.java b/server/src/main/java/org/elasticsearch/usage/UsageService.java index dd4895eb4bdc2..5b4fa0f27bf48 100644 --- a/server/src/main/java/org/elasticsearch/usage/UsageService.java +++ b/server/src/main/java/org/elasticsearch/usage/UsageService.java @@ -26,11 +26,13 @@ public class UsageService { private final Map handlers; private final SearchUsageHolder searchUsageHolder; private final CCSUsageTelemetry ccsUsageHolder; + private final CCSUsageTelemetry esqlUsageHolder; public UsageService() { this.handlers = new HashMap<>(); this.searchUsageHolder = new SearchUsageHolder(); this.ccsUsageHolder = new CCSUsageTelemetry(); + this.esqlUsageHolder = new CCSUsageTelemetry(false); } /** @@ -89,4 +91,8 @@ public SearchUsageHolder getSearchUsageHolder() { public CCSUsageTelemetry getCcsUsageHolder() { return ccsUsageHolder; } + + public CCSUsageTelemetry getEsqlUsageHolder() { + return esqlUsageHolder; + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshotTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshotTests.java index a72630c327ea2..6444caf08f831 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSTelemetrySnapshotTests.java @@ -352,4 +352,20 @@ public void testRanges() throws IOException { assertThat(value2Read.count(), equalTo(count1 + count2)); assertThat(value2Read.max(), equalTo(max1)); } + + public void testUseMRTFalse() { + CCSTelemetrySnapshot empty = new CCSTelemetrySnapshot(); + // Ignore MRT data + empty.setUseMRT(false); + + var randomWithMRT = randomValueOtherThanMany( + v -> v.getTookMrtTrue().count() == 0 || v.getTookMrtFalse().count() == 0, + this::randomCCSTelemetrySnapshot + ); + + empty.add(randomWithMRT); + assertThat(empty.getTook().count(), equalTo(randomWithMRT.getTook().count())); + assertThat(empty.getTookMrtFalse().count(), equalTo(0L)); + assertThat(empty.getTookMrtTrue().count(), equalTo(0L)); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetryTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetryTests.java index c4a2fdee1111e..5eb2224ec5f8e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetryTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/CCSUsageTelemetryTests.java @@ -340,4 +340,23 @@ public void testConcurrentUpdates() throws InterruptedException { CCSTelemetrySnapshot expectedSnapshot = ccsUsageHolder.getCCSTelemetrySnapshot(); assertThat(snapshot, equalTo(expectedSnapshot)); } + + public void testUseMRTFalse() { + // Ignore MRT counters if instructed. + CCSUsageTelemetry ccsUsageHolder = new CCSUsageTelemetry(false); + + CCSUsage.Builder builder = new CCSUsage.Builder(); + builder.took(10L).setRemotesCount(1).setClient("kibana"); + builder.setFeature(MRT_FEATURE); + ccsUsageHolder.updateUsage(builder.build()); + + builder = new CCSUsage.Builder(); + builder.took(11L).setRemotesCount(1).setClient("kibana"); + ccsUsageHolder.updateUsage(builder.build()); + + CCSTelemetrySnapshot snapshot = ccsUsageHolder.getCCSTelemetrySnapshot(); + assertThat(snapshot.getTook().count(), equalTo(2L)); + assertThat(snapshot.getTookMrtFalse().count(), equalTo(0L)); + assertThat(snapshot.getTookMrtTrue().count(), equalTo(0L)); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java index 06846c5e5f4f5..1b95eb2e7365f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java @@ -130,6 +130,7 @@ public void testCreation() { new ShardStats[] { shardStats }, new SearchUsageStats(), RepositoryUsageStats.EMPTY, + null, null ); diff --git a/server/src/test/resources/org/elasticsearch/action/admin/cluster/stats/telemetry_test.json b/server/src/test/resources/org/elasticsearch/action/admin/cluster/stats/telemetry_test.json index fe9c77cb2a183..a92bab739b37d 100644 --- a/server/src/test/resources/org/elasticsearch/action/admin/cluster/stats/telemetry_test.json +++ b/server/src/test/resources/org/elasticsearch/action/admin/cluster/stats/telemetry_test.json @@ -1,5 +1,4 @@ { - "_search" : { "total" : 10, "success" : 20, "skipped" : 5, @@ -63,5 +62,4 @@ } } } - } -} \ No newline at end of file +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/SkipUnavailableRule.java b/test/framework/src/main/java/org/elasticsearch/test/SkipUnavailableRule.java new file mode 100644 index 0000000000000..d5ce943b4d8fe --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/SkipUnavailableRule.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.test; + +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Test rule to process skip_unavailable override annotations + */ +public class SkipUnavailableRule implements TestRule { + private final Map skipMap; + + public SkipUnavailableRule(String... clusterAliases) { + this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true)); + } + + public Map getMap() { + return skipMap; + } + + @Override + public Statement apply(Statement base, Description description) { + // Check for annotation named "SkipOverride" and set the overrides accordingly + var aliases = description.getAnnotation(NotSkipped.class); + if (aliases != null) { + for (String alias : aliases.aliases()) { + skipMap.put(alias, false); + } + } + return base; + } + + /** + * Annotation to mark specific cluster in a test as not to be skipped when unavailable + */ + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface NotSkipped { + String[] aliases(); + } + +} diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 452f40baa34a8..c93b6404863e8 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -12,6 +12,7 @@ import org.apache.http.HttpHost; import org.elasticsearch.Version; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -37,9 +38,11 @@ import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.elasticsearch.xpack.esql.ccq.Clusters.REMOTE_CLUSTER_NAME; import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasKey; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class MultiClustersIT extends ESRestTestCase { @@ -395,6 +398,38 @@ public void testIndexPattern() throws Exception { } } + @SuppressWarnings("unchecked") + public void testStats() throws IOException { + Request caps = new Request("GET", "_capabilities?method=GET&path=_cluster/stats&capabilities=esql-stats"); + Response capsResponse = client().performRequest(caps); + Map capsResult = entityAsMap(capsResponse.getEntity()); + assumeTrue("esql stats capability missing", capsResult.get("supported").equals(true)); + + run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color", includeCCSMetadata()); + Request stats = new Request("GET", "_cluster/stats"); + Response statsResponse = client().performRequest(stats); + Map result = entityAsMap(statsResponse.getEntity()); + assertThat(result, hasKey("ccs")); + Map ccs = (Map) result.get("ccs"); + assertThat(ccs, hasKey("_esql")); + Map esql = (Map) ccs.get("_esql"); + assertThat(esql, hasKey("total")); + assertThat(esql, hasKey("success")); + assertThat(esql, hasKey("took")); + assertThat(esql, hasKey("remotes_per_search_max")); + assertThat(esql, hasKey("remotes_per_search_avg")); + assertThat(esql, hasKey("failure_reasons")); + assertThat(esql, hasKey("features")); + assertThat(esql, hasKey("clusters")); + Map clusters = (Map) esql.get("clusters"); + assertThat(clusters, hasKey(REMOTE_CLUSTER_NAME)); + assertThat(clusters, hasKey("(local)")); + Map clusterData = (Map) clusters.get(REMOTE_CLUSTER_NAME); + assertThat(clusterData, hasKey("total")); + assertThat(clusterData, hasKey("skipped")); + assertThat(clusterData, hasKey("took")); + } + private RestClient remoteClusterClient() throws IOException { var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses()); return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0])); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java new file mode 100644 index 0000000000000..ffbddd52b2551 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java @@ -0,0 +1,205 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.SkipUnavailableRule; +import org.elasticsearch.usage.UsageService; +import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.core.TimeValue.timeValueMillis; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; + +public class AbstractCrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase { + private static final Logger LOGGER = LogManager.getLogger(AbstractCrossClustersUsageTelemetryIT.class); + protected static final String REMOTE1 = "cluster-a"; + protected static final String REMOTE2 = "cluster-b"; + protected static final String LOCAL_INDEX = "logs-1"; + protected static final String REMOTE_INDEX = "logs-2"; + // We want to send search to a specific node (we don't care which one) so that we could + // collect the CCS telemetry from it later + protected String queryNode; + + @Before + public void setupQueryNode() { + // The tests are set up in a way that all queries within a single test are sent to the same node, + // thus enabling incremental collection of telemetry data, but the node is random for each test. + queryNode = cluster(LOCAL_CLUSTER).getRandomNodeName(); + } + + protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.columnar(randomBoolean()); + request.includeCCSMetadata(randomBoolean()); + return getTelemetryFromQuery(request, client); + } + + protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException, + InterruptedException { + // We don't care here too much about the response, we just want to trigger the telemetry collection. + // So we check it's not null and leave the rest to other tests. + if (client != null) { + assertResponse( + cluster(LOCAL_CLUSTER).client(queryNode) + .filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client)) + .execute(EsqlQueryAction.INSTANCE, request), + Assert::assertNotNull + ); + + } else { + assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull); + } + return getTelemetrySnapshot(queryNode); + } + + protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(String query) throws Exception { + EsqlQueryRequest request = EsqlQueryRequest.asyncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.columnar(randomBoolean()); + request.includeCCSMetadata(randomBoolean()); + request.waitForCompletionTimeout(TimeValue.timeValueMillis(100)); + request.keepOnCompletion(false); + return getTelemetryFromAsyncQuery(request); + } + + protected CCSTelemetrySnapshot getTelemetryFromAsyncQuery(EsqlQueryRequest request) throws Exception { + AtomicReference asyncExecutionId = new AtomicReference<>(); + assertResponse(cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request), resp -> { + if (resp.isRunning()) { + assertNotNull("async execution id is null", resp.asyncExecutionId()); + asyncExecutionId.set(resp.asyncExecutionId().get()); + } + }); + if (asyncExecutionId.get() != null) { + assertBusy(() -> { + var getResultsRequest = new GetAsyncResultRequest(asyncExecutionId.get()).setWaitForCompletionTimeout(timeValueMillis(1)); + try ( + var resp = cluster(LOCAL_CLUSTER).client(queryNode) + .execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest) + .actionGet(30, TimeUnit.SECONDS) + ) { + assertFalse(resp.isRunning()); + } + }); + } + return getTelemetrySnapshot(queryNode); + } + + protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.columnar(randomBoolean()); + request.includeCCSMetadata(randomBoolean()); + + ExecutionException ee = expectThrows( + ExecutionException.class, + cluster(LOCAL_CLUSTER).client(queryNode).execute(EsqlQueryAction.INSTANCE, request)::get + ); + assertNotNull(ee.getCause()); + + return getTelemetrySnapshot(queryNode); + } + + private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) { + var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName); + return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot(); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE1, REMOTE2); + } + + @Rule + public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2); + + protected Map setupClusters() { + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + + int numShardsRemote2 = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2); + clusterInfo.put("remote2.index", REMOTE_INDEX); + clusterInfo.put("remote2.num_shards", numShardsRemote2); + + return clusterInfo; + } + + void populateLocalIndices(String indexName, int numShards) { + Client localClient = client(LOCAL_CLUSTER); + assertAcked( + localClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + } + localClient.admin().indices().prepareRefresh(indexName).get(); + } + + void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { + Client remoteClient = client(clusterAlias); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); + } + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + var map = skipOverride.getMap(); + LOGGER.info("Using skip_unavailable map: [{}]", map); + return map; + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java new file mode 100644 index 0000000000000..33d868e7a69eb --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java @@ -0,0 +1,231 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot; +import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.SkipUnavailableRule; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE; +import static org.hamcrest.Matchers.equalTo; + +public class CrossClustersUsageTelemetryIT extends AbstractCrossClustersUsageTelemetryIT { + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); + plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); + return plugins; + } + + public void assertPerClusterCount(CCSTelemetrySnapshot.PerClusterCCSTelemetry perCluster, long count) { + assertThat(perCluster.getCount(), equalTo(count)); + assertThat(perCluster.getSkippedCount(), equalTo(0L)); + assertThat(perCluster.getTook().count(), equalTo(count)); + } + + public void testLocalRemote() throws Exception { + setupClusters(); + var telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(1L)); + assertThat(telemetry.getFailureReasons().size(), equalTo(0)); + assertThat(telemetry.getTook().count(), equalTo(1L)); + assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L)); + assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L)); + assertThat(telemetry.getClientCounts().size(), equalTo(1)); + assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L)); + assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null)); + + var perCluster = telemetry.getByRemoteCluster(); + assertThat(perCluster.size(), equalTo(3)); + for (String clusterAlias : remoteClusterAlias()) { + assertPerClusterCount(perCluster.get(clusterAlias), 1L); + } + assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L); + + telemetry = getTelemetryFromQuery("from logs-*,c*:logs-* | stats sum (v)", "kibana"); + assertThat(telemetry.getTotalCount(), equalTo(2L)); + assertThat(telemetry.getClientCounts().get("kibana"), equalTo(2L)); + perCluster = telemetry.getByRemoteCluster(); + assertThat(perCluster.size(), equalTo(3)); + for (String clusterAlias : remoteClusterAlias()) { + assertPerClusterCount(perCluster.get(clusterAlias), 2L); + } + assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L); + } + + public void testLocalOnly() throws Exception { + setupClusters(); + // Should not produce any usage info since it's a local search + var telemetry = getTelemetryFromQuery("from logs-* | stats sum (v)", "kibana"); + + assertThat(telemetry.getTotalCount(), equalTo(0L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(0)); + } + + @SkipUnavailableRule.NotSkipped(aliases = REMOTE1) + public void testFailed() throws Exception { + setupClusters(); + // Should not produce any usage info since it's a local search + var telemetry = getTelemetryFromFailedQuery("from no_such_index | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(0L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(0)); + + // One remote is skipped, one is not + telemetry = getTelemetryFromFailedQuery("from logs-*,c*:no_such_index | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(1)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L)); + Map expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 1L); + assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure)); + // cluster-b should be skipped + assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().get(REMOTE2).getSkippedCount(), equalTo(1L)); + + // this is only for cluster-a so no skipped remotes + telemetry = getTelemetryFromFailedQuery("from logs-*,cluster-a:no_such_index | stats sum (v)"); + assertThat(telemetry.getTotalCount(), equalTo(2L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(1)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L)); + expectedFailure = Map.of(CCSUsageTelemetry.Result.NOT_FOUND.getName(), 2L); + assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(1)); + } + + // TODO: enable when skip-up patch is merged + // public void testSkipAllRemotes() throws Exception { + // var telemetry = getTelemetryFromQuery("from logs-*,c*:no_such_index | stats sum (v)", "unknown"); + // + // assertThat(telemetry.getTotalCount(), equalTo(1L)); + // assertThat(telemetry.getSuccessCount(), equalTo(1L)); + // assertThat(telemetry.getFailureReasons().size(), equalTo(0)); + // assertThat(telemetry.getTook().count(), equalTo(1L)); + // assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L)); + // assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L)); + // assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + // assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + // assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L)); + // assertThat(telemetry.getClientCounts().size(), equalTo(0)); + // + // var perCluster = telemetry.getByRemoteCluster(); + // assertThat(perCluster.size(), equalTo(3)); + // for (String clusterAlias : remoteClusterAlias()) { + // var clusterData = perCluster.get(clusterAlias); + // assertThat(clusterData.getCount(), equalTo(0L)); + // assertThat(clusterData.getSkippedCount(), equalTo(1L)); + // assertThat(clusterData.getTook().count(), equalTo(0L)); + // } + // assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L); + // } + + public void testRemoteOnly() throws Exception { + setupClusters(); + var telemetry = getTelemetryFromQuery("from c*:logs-* | stats sum (v)", "kibana"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(1L)); + assertThat(telemetry.getFailureReasons().size(), equalTo(0)); + assertThat(telemetry.getTook().count(), equalTo(1L)); + assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L)); + assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L)); + assertThat(telemetry.getClientCounts().size(), equalTo(1)); + assertThat(telemetry.getClientCounts().get("kibana"), equalTo(1L)); + assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(null)); + + var perCluster = telemetry.getByRemoteCluster(); + assertThat(perCluster.size(), equalTo(2)); + for (String clusterAlias : remoteClusterAlias()) { + assertPerClusterCount(perCluster.get(clusterAlias), 1L); + } + assertThat(telemetry.getByRemoteCluster().size(), equalTo(2)); + } + + public void testAsync() throws Exception { + setupClusters(); + var telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(1L)); + assertThat(telemetry.getFailureReasons().size(), equalTo(0)); + assertThat(telemetry.getTook().count(), equalTo(1L)); + assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L)); + assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(0L)); + assertThat(telemetry.getClientCounts().size(), equalTo(0)); + assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(1L)); + + var perCluster = telemetry.getByRemoteCluster(); + assertThat(perCluster.size(), equalTo(3)); + for (String clusterAlias : remoteClusterAlias()) { + assertPerClusterCount(perCluster.get(clusterAlias), 1L); + } + assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L); + + // do it again + telemetry = getTelemetryFromAsyncQuery("from logs-*,c*:logs-* | stats sum (v)"); + assertThat(telemetry.getTotalCount(), equalTo(2L)); + assertThat(telemetry.getFeatureCounts().get(ASYNC_FEATURE), equalTo(2L)); + perCluster = telemetry.getByRemoteCluster(); + assertThat(perCluster.size(), equalTo(3)); + for (String clusterAlias : remoteClusterAlias()) { + assertPerClusterCount(perCluster.get(clusterAlias), 2L); + } + assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 2L); + } + + public void testNoSuchCluster() throws Exception { + setupClusters(); + // This is not recognized as a cross-cluster search + var telemetry = getTelemetryFromFailedQuery("from c*:logs*, nocluster:nomatch | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(0L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getByRemoteCluster().size(), equalTo(0)); + } + + @SkipUnavailableRule.NotSkipped(aliases = REMOTE1) + public void testDisconnect() throws Exception { + setupClusters(); + // Disconnect remote1 + cluster(REMOTE1).close(); + var telemetry = getTelemetryFromFailedQuery("from logs-*,cluster-a:logs-* | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + Map expectedFailure = Map.of(CCSUsageTelemetry.Result.REMOTES_UNAVAILABLE.getName(), 1L); + assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure)); + } + +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java new file mode 100644 index 0000000000000..2b993e9474062 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; +import org.elasticsearch.plugins.Plugin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class CrossClustersUsageTelemetryNoLicenseIT extends AbstractCrossClustersUsageTelemetryIT { + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class); + plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); + return plugins; + } + + public void testLicenseFailure() throws Exception { + setupClusters(); + var telemetry = getTelemetryFromFailedQuery("from logs-*,c*:logs-* | stats sum (v)"); + + assertThat(telemetry.getTotalCount(), equalTo(1L)); + assertThat(telemetry.getSuccessCount(), equalTo(0L)); + assertThat(telemetry.getTook().count(), equalTo(0L)); + assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); + assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); + Map expectedFailure = Map.of(CCSUsageTelemetry.Result.LICENSE.getName(), 1L); + assertThat(telemetry.getFailureReasons(), equalTo(expectedFailure)); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 52170dfb05256..077eb7a721003 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -205,6 +205,10 @@ public Cluster getCluster(String clusterAlias) { return clusterInfo.get(clusterAlias); } + public Map getClusters() { + return clusterInfo; + } + /** * Utility to swap a Cluster object. Guidelines for the remapping function: *
    diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index dad63d25046d9..974f029eab2ef 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -80,7 +80,8 @@ public void esql( ); QueryMetric clientId = QueryMetric.fromString("rest"); metrics.total(clientId); - session.execute(request, executionInfo, planRunner, wrap(x -> { + + ActionListener executeListener = wrap(x -> { planningMetricsManager.publish(planningMetrics, true); listener.onResponse(x); }, ex -> { @@ -88,7 +89,10 @@ public void esql( metrics.failed(clientId); planningMetricsManager.publish(planningMetrics, false); listener.onFailure(ex); - })); + }); + // Wrap it in a listener so that if we have any exceptions during execution, the listener picks it up + // and all the metrics are properly updated + ActionListener.run(executeListener, l -> session.execute(request, executionInfo, planRunner, l)); } public IndexResolver indexResolver() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 50d5819688e46..b44e249e38006 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -9,6 +9,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.stats.CCSUsage; +import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; @@ -20,16 +22,20 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.Nullable; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.usage.UsageService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.async.AsyncExecutionId; +import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.ColumnInfoImpl; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryAction; @@ -52,6 +58,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; @@ -71,6 +78,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction asyncTaskManagementService; private final RemoteClusterService remoteClusterService; private final QueryBuilderResolver queryBuilderResolver; + private final UsageService usageService; @Inject @SuppressWarnings("this-escape") @@ -86,8 +94,8 @@ public TransportEsqlQueryAction( BlockFactory blockFactory, Client client, NamedWriteableRegistry registry, - IndexNameExpressionResolver indexNameExpressionResolver - + IndexNameExpressionResolver indexNameExpressionResolver, + UsageService usageService ) { // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -126,6 +134,7 @@ public TransportEsqlQueryAction( ); this.remoteClusterService = transportService.getRemoteClusterService(); this.queryBuilderResolver = new QueryBuilderResolver(searchService, clusterService, transportService, indexNameExpressionResolver); + this.usageService = usageService; } @Override @@ -197,8 +206,65 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener toResponse(task, request, configuration, result)) + ActionListener.wrap(result -> { + recordCCSTelemetry(task, executionInfo, request, null); + listener.onResponse(toResponse(task, request, configuration, result)); + }, ex -> { + recordCCSTelemetry(task, executionInfo, request, ex); + listener.onFailure(ex); + }) ); + + } + + private void recordCCSTelemetry(Task task, EsqlExecutionInfo executionInfo, EsqlQueryRequest request, @Nullable Exception exception) { + if (executionInfo.isCrossClusterSearch() == false) { + return; + } + + CCSUsage.Builder usageBuilder = new CCSUsage.Builder(); + usageBuilder.setClientFromTask(task); + if (exception != null) { + if (exception instanceof VerificationException ve) { + CCSUsageTelemetry.Result failureType = classifyVerificationException(ve); + if (failureType != CCSUsageTelemetry.Result.UNKNOWN) { + usageBuilder.setFailure(failureType); + } else { + usageBuilder.setFailure(exception); + } + } else { + usageBuilder.setFailure(exception); + } + } + var took = executionInfo.overallTook(); + if (took != null) { + usageBuilder.took(took.getMillis()); + } + if (request.async()) { + usageBuilder.setFeature(CCSUsageTelemetry.ASYNC_FEATURE); + } + + AtomicInteger remotesCount = new AtomicInteger(); + executionInfo.getClusters().forEach((clusterAlias, cluster) -> { + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + usageBuilder.skippedRemote(clusterAlias); + } else { + usageBuilder.perClusterUsage(clusterAlias, cluster.getTook()); + } + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + remotesCount.getAndIncrement(); + } + }); + assert remotesCount.get() > 0 : "Got cross-cluster search telemetry without any remote clusters"; + usageBuilder.setRemotesCount(remotesCount.get()); + usageService.getEsqlUsageHolder().updateUsage(usageBuilder.build()); + } + + private CCSUsageTelemetry.Result classifyVerificationException(VerificationException exception) { + if (exception.getDetailedMessage().contains("Unknown index")) { + return CCSUsageTelemetry.Result.NOT_FOUND; + } + return CCSUsageTelemetry.Result.UNKNOWN; } private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index bd3b3bdb3483c..eb5e8206e9e6f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -312,7 +312,7 @@ public void analyzedPlan( .collect(Collectors.toSet()); final List indices = preAnalysis.indices; - EsqlSessionCCSUtils.checkForCcsLicense(indices, indicesExpressionGrouper, verifier.licenseState()); + EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 662572c466511..95f7a37ce4d62 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -308,6 +308,7 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { * @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search. */ public static void checkForCcsLicense( + EsqlExecutionInfo executionInfo, List indices, IndicesExpressionGrouper indicesGrouper, XPackLicenseState licenseState @@ -326,6 +327,17 @@ public static void checkForCcsLicense( // check if it is a cross-cluster query if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) { if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { + // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error + // so that the CCS telemetry handler can recognize that this error is CCS-related + for (Map.Entry entry : groupedIndices.entrySet()) { + executionInfo.swapCluster( + entry.getKey(), + (k, v) -> new EsqlExecutionInfo.Cluster( + entry.getKey(), + Strings.arrayToCommaDelimitedString(entry.getValue().indices()) + ) + ); + } throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 1000c05282fdb..6b01010ffa5f4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -644,6 +644,7 @@ public void testMissingIndicesIsFatal() { public void testCheckForCcsLicense() { final TestIndicesExpressionGrouper indicesGrouper = new TestIndicesExpressionGrouper(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); // this seems to be used only for tracking usage of features, not for checking if a license is expired final LongSupplier currTime = () -> System.currentTimeMillis(); @@ -671,22 +672,22 @@ public void testCheckForCcsLicense() { List indices = new ArrayList<>(); indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*")))); - checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid); - checkForCcsLicense(indices, indicesGrouper, platinumLicenseValid); - checkForCcsLicense(indices, indicesGrouper, goldLicenseValid); - checkForCcsLicense(indices, indicesGrouper, trialLicenseValid); - checkForCcsLicense(indices, indicesGrouper, basicLicenseValid); - checkForCcsLicense(indices, indicesGrouper, standardLicenseValid); - checkForCcsLicense(indices, indicesGrouper, missingLicense); - checkForCcsLicense(indices, indicesGrouper, nullLicense); - - checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, platinumLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, goldLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, trialLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, basicLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, standardLicenseInactive); - checkForCcsLicense(indices, indicesGrouper, missingLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicense); + checkForCcsLicense(executionInfo, indices, indicesGrouper, nullLicense); + + checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, platinumLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, goldLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, basicLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, standardLicenseInactive); + checkForCcsLicense(executionInfo, indices, indicesGrouper, missingLicenseInactive); } // cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license @@ -701,8 +702,8 @@ public void testCheckForCcsLicense() { } // licenses that work - checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid); - checkForCcsLicense(indices, indicesGrouper, trialLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, enterpriseLicenseValid); + checkForCcsLicense(executionInfo, indices, indicesGrouper, trialLicenseValid); // all others fail --- @@ -739,9 +740,10 @@ private void assertLicenseCheckFails( XPackLicenseState licenseState, String expectedErrorMessageSuffix ) { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, - () -> checkForCcsLicense(indices, indicesGrouper, licenseState) + () -> checkForCcsLicense(executionInfo, indices, indicesGrouper, licenseState) ); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(