From 85bc84903be0310a7ef81d239c3d37fd850d0294 Mon Sep 17 00:00:00 2001 From: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Date: Wed, 13 Sep 2023 18:27:28 +0300 Subject: [PATCH] Use long in Centroid count (#99491) (#99534) * Use long in Centroid count Centroids currently use integers to track how many samples their mean tracks. This can overflow in case the digest tracks billions of samples or more. TDigestState already serializes the count as VLong, so it can be read as VInt without compatibility issues. Fixes #80153 * Update docs/changelog/99491.yaml * More test fixes * Bump TransportVersion * Revert TransportVersion change --- docs/changelog/99491.yaml | 6 ++++++ .../elasticsearch/tdigest/AVLGroupTree.java | 18 +++++++++--------- .../elasticsearch/tdigest/AVLTreeDigest.java | 14 +++++++------- .../org/elasticsearch/tdigest/Centroid.java | 12 ++++++------ .../elasticsearch/tdigest/HybridDigest.java | 2 +- .../elasticsearch/tdigest/MergingDigest.java | 4 ++-- .../elasticsearch/tdigest/SortingDigest.java | 2 +- .../org/elasticsearch/tdigest/TDigest.java | 2 +- .../tdigest/MergingDigestTests.java | 2 +- .../metrics/EmptyTDigestState.java | 2 +- .../aggregations/metrics/TDigestState.java | 6 +++--- .../xpack/analytics/AnalyticsTestsUtils.java | 2 +- .../HistogramPercentileAggregationTests.java | 4 ++-- 13 files changed, 41 insertions(+), 35 deletions(-) create mode 100644 docs/changelog/99491.yaml diff --git a/docs/changelog/99491.yaml b/docs/changelog/99491.yaml new file mode 100644 index 0000000000000..dfeab5dbbad6d --- /dev/null +++ b/docs/changelog/99491.yaml @@ -0,0 +1,6 @@ +pr: 99491 +summary: Use long in Centroid count +area: Aggregations +type: bug +issues: + - 80153 diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java index 584d66af500b9..12b2a29d3e034 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java @@ -31,10 +31,10 @@ final class AVLGroupTree extends AbstractCollection { /* For insertions into the tree */ private double centroid; - private int count; + private long count; private double[] centroids; - private int[] counts; - private int[] aggregatedCounts; + private long[] counts; + private long[] aggregatedCounts; private final IntAVLTree tree; AVLGroupTree() { @@ -78,8 +78,8 @@ protected void fixAggregates(int node) { }; centroids = new double[tree.capacity()]; - counts = new int[tree.capacity()]; - aggregatedCounts = new int[tree.capacity()]; + counts = new long[tree.capacity()]; + aggregatedCounts = new long[tree.capacity()]; } /** @@ -113,14 +113,14 @@ public double mean(int node) { /** * Return the count for the provided node. */ - public int count(int node) { + public long count(int node) { return counts[node]; } /** * Add the provided centroid to the tree. */ - public void add(double centroid, int count) { + public void add(double centroid, long count) { this.centroid = centroid; this.count = count; tree.add(); @@ -135,7 +135,7 @@ public boolean add(Centroid centroid) { /** * Update values associated with a node, readjusting the tree if necessary. */ - public void update(int node, double centroid, int count) { + public void update(int node, double centroid, long count) { // have to do full scale update this.centroid = centroid; this.count = count; @@ -242,7 +242,7 @@ public void remove() { /** * Return the total count of points that have been added to the tree. */ - public int sum() { + public long sum() { return aggregatedCounts[tree.root()]; } diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java index cdd6e5ab2b16a..deb3407565f36 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java @@ -68,7 +68,7 @@ public int centroidCount() { } @Override - public void add(double x, int w) { + public void add(double x, long w) { checkValue(x); needsCompression = true; @@ -84,7 +84,7 @@ public void add(double x, int w) { } if (start == NIL) { // empty summary - assert summary.size() == 0; + assert summary.isEmpty(); summary.add(x, w); count = w; } else { @@ -127,7 +127,7 @@ public void add(double x, int w) { // if the nearest point was not unique, then we may not be modifying the first copy // which means that ordering can change double centroid = summary.mean(closest); - int count = summary.count(closest); + long count = summary.count(closest); centroid = weightedAverage(centroid, count, x, w); count += w; summary.update(closest, centroid, count); @@ -189,7 +189,7 @@ public long size() { @Override public double cdf(double x) { AVLGroupTree values = summary; - if (values.size() == 0) { + if (values.isEmpty()) { return Double.NaN; } if (values.size() == 1) { @@ -272,7 +272,7 @@ public double quantile(double q) { } AVLGroupTree values = summary; - if (values.size() == 0) { + if (values.isEmpty()) { // no centroids means no data, no way to get a quantile return Double.NaN; } else if (values.size() == 1) { @@ -293,7 +293,7 @@ public double quantile(double q) { } int currentNode = values.first(); - int currentWeight = values.count(currentNode); + long currentWeight = values.count(currentNode); // Total mass to the left of the center of the current node. double weightSoFar = currentWeight / 2.0; @@ -305,7 +305,7 @@ public double quantile(double q) { for (int i = 0; i < values.size() - 1; i++) { int nextNode = values.next(currentNode); - int nextWeight = values.count(nextNode); + long nextWeight = values.count(nextNode); // this is the mass between current center and next center double dw = (currentWeight + nextWeight) / 2.0; diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java index ac2ddb869d14d..a09a0862c30af 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java @@ -30,7 +30,7 @@ public class Centroid implements Comparable { private static final AtomicInteger uniqueCount = new AtomicInteger(1); private double centroid = 0; - private int count = 0; + private long count = 0; // The ID is transient because it must be unique within a given JVM. A new // ID should be generated from uniqueCount when a Centroid is deserialized. @@ -45,22 +45,22 @@ public Centroid(double x) { start(x, 1, uniqueCount.getAndIncrement()); } - public Centroid(double x, int w) { + public Centroid(double x, long w) { this(); start(x, w, uniqueCount.getAndIncrement()); } - public Centroid(double x, int w, int id) { + public Centroid(double x, long w, int id) { this(); start(x, w, id); } - private void start(double x, int w, int id) { + private void start(double x, long w, int id) { this.id = id; add(x, w); } - public void add(double x, int w) { + public void add(double x, long w) { count += w; centroid += w * (x - centroid) / count; } @@ -69,7 +69,7 @@ public double mean() { return centroid; } - public int count() { + public long count() { return count; } diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java index 999f03e91ae4f..07a12381e2a71 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java @@ -70,7 +70,7 @@ public class HybridDigest extends AbstractTDigest { } @Override - public void add(double x, int w) { + public void add(double x, long w) { reserve(w); if (mergingDigest != null) { mergingDigest.add(x, w); diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java index 51364d8dc281b..0be2b68d76a21 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java @@ -216,7 +216,7 @@ public MergingDigest(double compression, int bufferSize, int size) { } @Override - public void add(double x, int w) { + public void add(double x, long w) { checkValue(x); if (tempUsed >= tempWeight.length - lastUsedCell - 1) { mergeNewValues(); @@ -514,7 +514,7 @@ public boolean hasNext() { @Override public Centroid next() { - Centroid rc = new Centroid(mean[i], (int) weight[i]); + Centroid rc = new Centroid(mean[i], (long) weight[i]); i++; return rc; } diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java index 200a54494e208..92f770cbb7569 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java @@ -39,7 +39,7 @@ public class SortingDigest extends AbstractTDigest { private boolean isSorted = true; @Override - public void add(double x, int w) { + public void add(double x, long w) { checkValue(x); isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x); for (int i = 0; i < w; i++) { diff --git a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java index 5dc3706193211..2eaf3192eefef 100644 --- a/libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java +++ b/libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java @@ -94,7 +94,7 @@ public static TDigest createHybridDigest(double compression) { * @param x The value to add. * @param w The weight of this point. */ - public abstract void add(double x, int w); + public abstract void add(double x, long w); /** * Add a single sample to this TDigest. diff --git a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java index 2eb989f34f8a2..16a81bad50756 100644 --- a/libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java +++ b/libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java @@ -118,7 +118,7 @@ public void testSingletonsAtEnds() { d.add(x); } } - int last = 0; + long last = 0; for (Centroid centroid : d.centroids()) { if (last == 0) { assertEquals(1, centroid.count()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java index 159e70083065c..a367b52961467 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java @@ -15,7 +15,7 @@ public EmptyTDigestState() { } @Override - public void add(double x, int w) { + public void add(double x, long w) { throw new UnsupportedOperationException("Immutable Empty TDigest"); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java index 2fbe2e679c1ab..7eb07d320c84b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java @@ -138,7 +138,7 @@ public static TDigestState read(StreamInput in) throws IOException { state.tdigest.reserve(size); } for (int i = 0; i < n; i++) { - state.add(in.readDouble(), in.readVInt()); + state.add(in.readDouble(), in.readVLong()); } return state; } @@ -189,7 +189,7 @@ public int hashCode() { h = 31 * h + Integer.hashCode(centroidCount()); for (Centroid centroid : centroids()) { h = 31 * h + Double.hashCode(centroid.mean()); - h = 31 * h + centroid.count(); + h = 31 * h + (int) centroid.count(); } h = 31 * h + Double.hashCode(getMax()); h = 31 * h + Double.hashCode(getMin()); @@ -205,7 +205,7 @@ public void add(TDigestState other) { tdigest.add(other.tdigest); } - public void add(double x, int w) { + public void add(double x, long w) { tdigest.add(x, w); } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/AnalyticsTestsUtils.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/AnalyticsTestsUtils.java index 48ffaea45b436..afb46709959ab 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/AnalyticsTestsUtils.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/AnalyticsTestsUtils.java @@ -29,7 +29,7 @@ public static BinaryDocValuesField histogramFieldDocValues(String fieldName, dou BytesStreamOutput streamOutput = new BytesStreamOutput(); histogram.compress(); for (Centroid centroid : histogram.centroids()) { - streamOutput.writeVInt(centroid.count()); + streamOutput.writeVLong(centroid.count()); streamOutput.writeDouble(centroid.mean()); } return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef()); diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistogramPercentileAggregationTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistogramPercentileAggregationTests.java index c7526af02a772..bf289a601ae21 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistogramPercentileAggregationTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistogramPercentileAggregationTests.java @@ -185,7 +185,7 @@ private void setupTDigestHistogram(int compression) throws Exception { client().bulk(bulkRequest); bulkRequest = new BulkRequest(); List values = new ArrayList<>(); - List counts = new ArrayList<>(); + List counts = new ArrayList<>(); Collection centroids = histogram.centroids(); for (Centroid centroid : centroids) { values.add(centroid.mean()); @@ -196,7 +196,7 @@ private void setupTDigestHistogram(int compression) throws Exception { .startObject("inner") .startObject("data") .field("values", values.toArray(new Double[values.size()])) - .field("counts", counts.toArray(new Integer[counts.size()])) + .field("counts", counts.toArray(new Long[counts.size()])) .endObject() .endObject() .endObject();