Skip to content

Commit

Permalink
Add ESQL telemetry collection (elastic#119474)
Browse files Browse the repository at this point in the history
* Add ESQL telemetry collection

(cherry picked from commit 0292905)

# Conflicts:
#	server/src/main/java/org/elasticsearch/TransportVersions.java
  • Loading branch information
smalyshev committed Jan 2, 2025
1 parent edfdd07 commit baae609
Show file tree
Hide file tree
Showing 28 changed files with 865 additions and 126 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/119474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119474
summary: "Add ES|QL cross-cluster query telemetry collection"
area: ES|QL
type: enhancement
issues: []
7 changes: 5 additions & 2 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Returns cluster statistics.

* If the {es} {security-features} are enabled, you must have the `monitor` or
`manage` <<privileges-list-cluster,cluster privilege>> to use this API.

[[cluster-stats-api-desc]]
==== {api-description-title}

Expand Down Expand Up @@ -1397,7 +1396,7 @@ as a human-readable string.
`_search`:::
(object) Contains the information about the <<modules-cross-cluster-search, {ccs}>> usage in the cluster.
(object) Contains information about <<modules-cross-cluster-search, {ccs}>> usage.
+
.Properties of `_search`
[%collapsible%open]
Expand Down Expand Up @@ -1528,7 +1527,11 @@ This may include requests where partial results were returned, but not requests
=======


======
`_esql`:::
(object) Contains information about <<esql-cross-clusters,{esql} {ccs}>> usage.
The structure of the object is the same as the `_search` object above.
=====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,19 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SkipUnavailableRule;
import org.elasticsearch.test.SkipUnavailableRule.NotSkipped;
import org.elasticsearch.usage.UsageService;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Arrays;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE;
import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.MRT_FEATURE;
Expand Down Expand Up @@ -498,7 +490,7 @@ public void testRemoteOnlyTimesOut() throws Exception {
assertThat(perCluster.get(REMOTE2), equalTo(null));
}

@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteTimesOutFailure() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String remoteIndex = (String) testClusterInfo.get("remote.index");
Expand Down Expand Up @@ -528,7 +520,7 @@ public void testRemoteTimesOutFailure() throws Exception {
/**
* Search when all the remotes failed and not skipped
*/
@SkipOverride(aliases = { REMOTE1, REMOTE2 })
@NotSkipped(aliases = { REMOTE1, REMOTE2 })
public void testFailedAllRemotesSearch() throws Exception {
Map<String, Object> testClusterInfo = setupClusters();
String localIndex = (String) testClusterInfo.get("local.index");
Expand Down Expand Up @@ -577,7 +569,7 @@ public void testRemoteHasNoIndex() throws Exception {
/**
* Test that we're still counting remote search even if remote cluster has no such index
*/
@SkipOverride(aliases = { REMOTE1 })
@NotSkipped(aliases = { REMOTE1 })
public void testRemoteHasNoIndexFailure() throws Exception {
SearchRequest searchRequest = makeSearchRequest(REMOTE1 + ":no_such_index");
CCSTelemetrySnapshot telemetry = getTelemetryFromFailedSearch(searchRequest);
Expand Down Expand Up @@ -689,40 +681,4 @@ private int indexDocs(Client client, String index) {
return numDocs;
}

/**
* Annotation to mark specific cluster in a test as not to be skipped when unavailable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@interface SkipOverride {
String[] aliases();
}

/**
* Test rule to process skip annotations
*/
static class SkipUnavailableRule implements TestRule {
private final Map<String, Boolean> skipMap;

SkipUnavailableRule(String... clusterAliases) {
this.skipMap = Arrays.stream(clusterAliases).collect(Collectors.toMap(Function.identity(), alias -> true));
}

public Map<String, Boolean> getMap() {
return skipMap;
}

@Override
public Statement apply(Statement base, Description description) {
// Check for annotation named "SkipOverride" and set the overrides accordingly
var aliases = description.getAnnotation(SkipOverride.class);
if (aliases != null) {
for (String alias : aliases.aliases()) {
skipMap.put(alias, false);
}
}
return base;
}

}
}
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ static TransportVersion def(int id) {
public static final TransportVersion FAILURE_STORE_ENABLED_BY_CLUSTER_SETTING = def(8_812_00_0);
public static final TransportVersion SIMULATE_IGNORED_FIELDS = def(8_813_00_0);
public static final TransportVersion TRANSFORMS_UPGRADE_MODE = def(8_814_00_0);
public static final TransportVersion NODE_SHUTDOWN_EPHEMERAL_ID_ADDED = def(8_815_00_0);
public static final TransportVersion ESQL_CCS_TELEMETRY_STATS = def(8_816_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
* <br>
*/
public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment {
public static final String CCS_TELEMETRY_FIELD_NAME = "_search";
private long totalCount;
private long successCount;
private final Map<String, Long> failureReasons;
Expand All @@ -66,6 +65,9 @@ public final class CCSTelemetrySnapshot implements Writeable, ToXContentFragment

private final Map<String, Long> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Whether we should use per-MRT (minimize roundtrips) metrics.
// ES|QL does not have "minimize_roundtrips" option, so we don't collect those metrics for ES|QL usage.
private boolean useMRT = true;

/**
* Creates a new stats instance with the provided info.
Expand Down Expand Up @@ -191,6 +193,11 @@ public Map<String, PerClusterCCSTelemetry> getByRemoteCluster() {
return Collections.unmodifiableMap(byRemoteCluster);
}

public CCSTelemetrySnapshot setUseMRT(boolean useMRT) {
this.useMRT = useMRT;
return this;
}

public static class PerClusterCCSTelemetry implements Writeable, ToXContentFragment {
private long count;
private long skippedCount;
Expand Down Expand Up @@ -270,6 +277,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(count, skippedCount, took);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

/**
Expand All @@ -291,8 +303,10 @@ public void add(CCSTelemetrySnapshot stats) {
stats.featureCounts.forEach((k, v) -> featureCounts.merge(k, v, Long::sum));
stats.clientCounts.forEach((k, v) -> clientCounts.merge(k, v, Long::sum));
took.add(stats.took);
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
if (useMRT) {
tookMrtTrue.add(stats.tookMrtTrue);
tookMrtFalse.add(stats.tookMrtFalse);
}
remotesPerSearchMax = Math.max(remotesPerSearchMax, stats.remotesPerSearchMax);
if (totalCount > 0 && oldCount > 0) {
// Weighted average
Expand Down Expand Up @@ -328,30 +342,28 @@ private static void publishLatency(XContentBuilder builder, String name, LongMet

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(CCS_TELEMETRY_FIELD_NAME);
{
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
builder.field("total", totalCount);
builder.field("success", successCount);
builder.field("skipped", skippedRemotes);
publishLatency(builder, "took", took);
if (useMRT) {
publishLatency(builder, "took_mrt_true", tookMrtTrue);
publishLatency(builder, "took_mrt_false", tookMrtFalse);
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.field("remotes_per_search_max", remotesPerSearchMax);
builder.field("remotes_per_search_avg", remotesPerSearchAvg);
builder.field("failure_reasons", failureReasons);
builder.field("features", featureCounts);
builder.field("clients", clientCounts);
builder.startObject("clusters");
{
for (var entry : byRemoteCluster.entrySet()) {
String remoteName = entry.getKey();
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(remoteName)) {
remoteName = SearchResponse.LOCAL_CLUSTER_NAME_REPRESENTATION;
}
builder.field(remoteName, entry.getValue());
}
builder.endObject();
}
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ShardOperationFailedException;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.query.SearchTimeoutException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.Arrays;
Expand Down Expand Up @@ -84,6 +86,15 @@ public Builder setClient(String client) {
return this;
}

public Builder setClientFromTask(Task task) {
String client = task.getHeader(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER);
if (client != null) {
return setClient(client);
} else {
return this;
}
}

public Builder skippedRemote(String remote) {
this.skippedRemotes.add(remote);
return this;
Expand Down Expand Up @@ -133,6 +144,10 @@ public static Result getFailureType(Exception e) {
if (ExceptionsHelper.unwrapCorruption(e) != null) {
return Result.CORRUPTION;
}
ElasticsearchStatusException se = (ElasticsearchStatusException) ExceptionsHelper.unwrap(e, ElasticsearchStatusException.class);
if (se != null && se.getDetailedMessage().contains("license")) {
return Result.LICENSE;
}
// This is kind of last resort check - if we still don't know the reason but all shard failures are remote,
// we assume it's remote's fault somehow.
if (e instanceof SearchPhaseExecutionException spe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum Result {
TIMEOUT("timeout"),
CORRUPTION("corruption"),
SECURITY("security"),
LICENSE("license"),
// May be helpful if there's a lot of other reasons, and it may be hard to calculate the unknowns for some clients.
UNKNOWN("other");

Expand Down Expand Up @@ -106,8 +107,14 @@ public String getName() {

private final Map<String, LongAdder> clientCounts;
private final Map<String, PerClusterCCSTelemetry> byRemoteCluster;
// Should we calculate separate metrics per MRT?
private final boolean useMRT;

public CCSUsageTelemetry() {
this(true);
}

public CCSUsageTelemetry(boolean useMRT) {
this.byRemoteCluster = new ConcurrentHashMap<>();
totalCount = new LongAdder();
successCount = new LongAdder();
Expand All @@ -119,6 +126,7 @@ public CCSUsageTelemetry() {
skippedRemotes = new LongAdder();
featureCounts = new ConcurrentHashMap<>();
clientCounts = new ConcurrentHashMap<>();
this.useMRT = useMRT;
}

public void updateUsage(CCSUsage ccsUsage) {
Expand All @@ -134,10 +142,12 @@ private void doUpdate(CCSUsage ccsUsage) {
if (isSuccess(ccsUsage)) {
successCount.increment();
took.record(searchTook);
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
if (useMRT) {
if (isMRT(ccsUsage)) {
tookMrtTrue.record(searchTook);
} else {
tookMrtFalse.record(searchTook);
}
}
ccsUsage.getPerClusterUsage().forEach((r, u) -> byRemoteCluster.computeIfAbsent(r, PerClusterCCSTelemetry::new).update(u));
} else {
Expand Down Expand Up @@ -243,6 +253,6 @@ public CCSTelemetrySnapshot getCCSTelemetrySnapshot() {
Collections.unmodifiableMap(Maps.transformValues(featureCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(clientCounts, LongAdder::longValue)),
Collections.unmodifiableMap(Maps.transformValues(byRemoteCluster, PerClusterCCSTelemetry::getSnapshot))
);
).setUseMRT(useMRT);
}
}
Loading

0 comments on commit baae609

Please sign in to comment.