diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index 122ac90d40998..bb4c2ae417801 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -120,7 +120,6 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext InternalHistogram.Bucket newBucket = bucket; if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) { - values.offer(thisBucketValue); // Some models (e.g. HoltWinters) have certain preconditions that must be met if (model.hasValue(values.size())) { @@ -142,6 +141,8 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext } lastValidPosition = counter; } + + values.offer(thisBucketValue); } counter += 1; newBuckets.add(newBucket); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java index 6cc75b21ec738..0a62a1aa24043 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.text.ParseException; +import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -50,6 +51,15 @@ public EwmaModel(double alpha) { this.alpha = alpha; } + @Override + protected double[] doPredict(Collection values, int numPredictions) { + double[] predictions = new double[numPredictions]; + + // EWMA just emits the same final prediction repeatedly. + Arrays.fill(predictions, next(values)); + + return predictions; + } @Override public double next(Collection values) { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java index dd4b679bccc30..81ebbe382c441 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java @@ -67,7 +67,7 @@ public HoltLinearModel(double alpha, double beta) { * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - public double[] predict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java index f43ea7302540a..4a25c590639ca 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java @@ -176,7 +176,7 @@ public boolean hasValue(int windowLength) { * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - public double[] predict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java index 4036a446f9462..7b2c06d347b91 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.text.ParseException; +import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -41,6 +42,16 @@ public class LinearModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("linear"); + @Override + protected double[] doPredict(Collection values, int numPredictions) { + double[] predictions = new double[numPredictions]; + + // EWMA just emits the same final prediction repeatedly. + Arrays.fill(predictions, next(values)); + + return predictions; + } + @Override public double next(Collection values) { double avg = 0; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java index c987aec76d282..50c59fc6228dc 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline.movavg.models; -import com.google.common.collect.EvictingQueue; - import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.SearchParseException; @@ -44,7 +42,7 @@ public abstract class MovAvgModel { */ public boolean hasValue(int windowLength) { // Default implementation can always provide a next() value - return true; + return windowLength > 0; } /** @@ -57,9 +55,7 @@ public boolean hasValue(int windowLength) { public abstract double next(Collection values); /** - * Predicts the next `n` values in the series, using the smoothing model to generate new values. - * Default prediction mode is to simply continuing calling next() and adding the - * predicted value back into the windowed buffer. + * Predicts the next `n` values in the series. * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return @@ -67,34 +63,31 @@ public boolean hasValue(int windowLength) { * @return Returns an array of doubles, since most smoothing methods operate on floating points */ public double[] predict(Collection values, int numPredictions) { - double[] predictions = new double[numPredictions]; + assert(numPredictions >= 1); // If there are no values, we can't do anything. Return an array of NaNs. - if (values.size() == 0) { + if (values.isEmpty()) { return emptyPredictions(numPredictions); } - // special case for one prediction, avoids allocation - if (numPredictions < 1) { - throw new IllegalArgumentException("numPredictions may not be less than 1."); - } else if (numPredictions == 1){ - predictions[0] = next(values); - return predictions; - } - - Collection predictionBuffer = EvictingQueue.create(values.size()); - predictionBuffer.addAll(values); - - for (int i = 0; i < numPredictions; i++) { - predictions[i] = next(predictionBuffer); - - // Add the last value to the buffer, so we can keep predicting - predictionBuffer.add(predictions[i]); - } - - return predictions; + return doPredict(values, numPredictions); } + /** + * Calls to the model-specific implementation which actually generates the predictions + * + * @param values Collection of numerics to movingAvg, usually windowed + * @param numPredictions Number of newly generated predictions to return + * @param Type of numeric + * @return Returns an array of doubles, since most smoothing methods operate on floating points + */ + protected abstract double[] doPredict(Collection values, int numPredictions); + + /** + * Returns an empty set of predictions, filled with NaNs + * @param numPredictions + * @return + */ protected double[] emptyPredictions(int numPredictions) { double[] predictions = new double[numPredictions]; Arrays.fill(predictions, Double.NaN); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java index 2e770a9acef25..d0dd421704a9f 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.text.ParseException; +import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -39,6 +40,16 @@ public class SimpleModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("simple"); + @Override + protected double[] doPredict(Collection values, int numPredictions) { + double[] predictions = new double[numPredictions]; + + // EWMA just emits the same final prediction repeatedly. + Arrays.fill(predictions, next(values)); + + return predictions; + } + @Override public double next(Collection values) { double avg = 0; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java index 0873ce4665107..6105d6da98d91 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java @@ -50,6 +50,7 @@ public static ArrayList generateHistogram(int interval, int size, do ArrayList values = new ArrayList<>(size); boolean lastWasGap = false; + boolean emptyHisto = true; for (int i = 0; i < size; i++) { MockBucket bucket = new MockBucket(); @@ -70,15 +71,27 @@ public static ArrayList generateHistogram(int interval, int size, do bucket.count = randomIntBetween(1, 50); bucket.docValues = new double[bucket.count]; for (int j = 0; j < bucket.count; j++) { - bucket.docValues[j] = randomDouble() * randomIntBetween(-20,20); + bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20); } lastWasGap = false; + emptyHisto = false; } bucket.key = i * interval; values.add(bucket); } + if (emptyHisto) { + int idx = randomIntBetween(0, values.size()-1); + MockBucket bucket = values.get(idx); + bucket.count = randomIntBetween(1, 50); + bucket.docValues = new double[bucket.count]; + for (int j = 0; j < bucket.count; j++) { + bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20); + } + values.set(idx, bucket); + } + return values; } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java index 9e92c69f7e253..3359e538783b5 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java @@ -148,15 +148,6 @@ public void setupSuiteScopeCluster() throws Exception { } } - // Used for specially crafted gap tests - builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject() - .field(INTERVAL_FIELD, 0) - .field(GAP_FIELD, 1).endObject())); - - builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject() - .field(INTERVAL_FIELD, 49) - .field(GAP_FIELD, 1).endObject())); - for (int i = -10; i < 10; i++) { builders.add(client().prepareIndex("neg_idx", "type").setSource( jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD, 10).endObject())); @@ -204,31 +195,36 @@ private void setupExpected(MovAvgType type, MetricTarget target) { metricValue = target.equals(MetricTarget.VALUE) ? PipelineAggregationHelperTests.calculateMetric(docValues, metric) : mockBucket.count; } - window.offer(metricValue); - switch (type) { - case SIMPLE: - values.add(simple(window)); - break; - case LINEAR: - values.add(linear(window)); - break; - case EWMA: - values.add(ewma(window)); - break; - case HOLT: - values.add(holt(window)); - break; - case HOLT_WINTERS: - // HW needs at least 2 periods of data to start - if (window.size() >= period * 2) { - values.add(holtWinters(window)); - } else { - values.add(null); - } - - break; + if (window.size() > 0) { + switch (type) { + case SIMPLE: + values.add(simple(window)); + break; + case LINEAR: + values.add(linear(window)); + break; + case EWMA: + values.add(ewma(window)); + break; + case HOLT: + values.add(holt(window)); + break; + case HOLT_WINTERS: + // HW needs at least 2 periods of data to start + if (window.size() >= period * 2) { + values.add(holtWinters(window)); + } else { + values.add(null); + } + + break; + } + } else { + values.add(null); } + window.offer(metricValue); + } testValues.put(type.toString() + "_" + target.toString(), values); } @@ -685,7 +681,10 @@ public void testPredictNegativeKeysAtStart() { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(25)); - for (int i = 0; i < 20; i++) { + SimpleValue current = buckets.get(0).getAggregations().get("movavg_values"); + assertThat(current, nullValue()); + + for (int i = 1; i < 20; i++) { Bucket bucket = buckets.get(i); assertThat(bucket, notNullValue()); assertThat((long) bucket.getKey(), equalTo((long) i - 10)); @@ -699,7 +698,6 @@ public void testPredictNegativeKeysAtStart() { } for (int i = 20; i < 25; i++) { - System.out.println(i); Bucket bucket = buckets.get(i); assertThat(bucket, notNullValue()); assertThat((long) bucket.getKey(), equalTo((long) i - 10)); @@ -877,350 +875,6 @@ public void testNegativePrediction() { } } - /** - * This test uses the "gap" dataset, which is simply a doc at the beginning and end of - * the INTERVAL_FIELD range. These docs have a value of 1 in GAP_FIELD. - * This test verifies that large gaps don't break things, and that the mov avg roughly works - * in the correct manner (checks direction of change, but not actual values) - */ - @Test - public void testGiantGap() { - - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(min("the_metric").field(GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric")) - ).execute().actionGet(); - - assertSearchResponse(response); - - InternalHistogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50)); - - double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value(); - assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0)); - - double currentValue; - for (int i = 1; i < 49; i++) { - SimpleValue current = buckets.get(i).getAggregations().get("movavg_values"); - if (current != null) { - currentValue = current.value(); - - // Since there are only two values in this test, at the beginning and end, the moving average should - // decrease every step (until it reaches zero). Crude way to check that it's doing the right thing - // without actually verifying the computed values. Should work for all types of moving avgs and - // gap policies - assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0)); - lastValue = currentValue; - } - } - - - SimpleValue current = buckets.get(49).getAggregations().get("movavg_values"); - assertThat(current, notNullValue()); - currentValue = current.value(); - - if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) { - // if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma) - assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0)); - } else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) { - // If we insert zeros, this should always increase the moving avg since the last bucket has a real value - assertThat(Double.compare(lastValue, currentValue), equalTo(-1)); - } - } - - /** - * Big gap, but with prediction at the end. - */ - @Test - public void testGiantGapWithPredict() { - int numPredictions = randomIntBetween(1, 10); - - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(min("the_metric").field(GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric") - .predict(numPredictions)) - ).execute().actionGet(); - - assertSearchResponse(response); - - InternalHistogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions)); - - - double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value(); - assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0)); - - double currentValue; - for (int i = 1; i < 49; i++) { - SimpleValue current = buckets.get(i).getAggregations().get("movavg_values"); - if (current != null) { - currentValue = current.value(); - - // Since there are only two values in this test, at the beginning and end, the moving average should - // decrease every step (until it reaches zero). Crude way to check that it's doing the right thing - // without actually verifying the computed values. Should work for all types of moving avgs and - // gap policies - assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0)); - lastValue = currentValue; - } - } - - SimpleValue current = buckets.get(49).getAggregations().get("movavg_values"); - assertThat(current, notNullValue()); - currentValue = current.value(); - - if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) { - // if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma) - assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0)); - } else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) { - // If we insert zeros, this should always increase the moving avg since the last bucket has a real value - assertThat(Double.compare(lastValue, currentValue), equalTo(-1)); - } - - // Now check predictions - for (int i = 50; i < 50 + numPredictions; i++) { - // Unclear at this point which direction the predictions will go, just verify they are - // not null, and that we don't have the_metric anymore - assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue()); - assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue()); - } - } - - /** - * This test filters the "gap" data so that the first doc is excluded. This leaves a long stretch of empty - * buckets until the final bucket. The moving avg should be zero up until the last bucket, and should work - * regardless of mov avg type or gap policy. - */ - @Test - public void testLeftGap() { - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(randomMetric("the_metric", GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric")) - )) - .execute().actionGet(); - - assertSearchResponse(response); - - InternalFilter filtered = response.getAggregations().get("filtered"); - assertThat(filtered, notNullValue()); - assertThat(filtered.getName(), equalTo("filtered")); - - InternalHistogram histo = filtered.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50)); - - double lastValue = 0; - - double currentValue; - for (int i = 0; i < 50; i++) { - SimpleValue current = buckets.get(i).getAggregations().get("movavg_values"); - if (current != null) { - currentValue = current.value(); - - assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0)); - lastValue = currentValue; - } - } - } - - @Test - public void testLeftGapWithPredict() { - int numPredictions = randomIntBetween(1, 10); - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(randomMetric("the_metric", GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric") - .predict(numPredictions)) - )) - .execute().actionGet(); - - assertSearchResponse(response); - - InternalFilter filtered = response.getAggregations().get("filtered"); - assertThat(filtered, notNullValue()); - assertThat(filtered.getName(), equalTo("filtered")); - - InternalHistogram histo = filtered.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions)); - - - double lastValue = 0; - - double currentValue; - for (int i = 0; i < 50; i++) { - SimpleValue current = buckets.get(i).getAggregations().get("movavg_values"); - if (current != null) { - currentValue = current.value(); - - assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0)); - lastValue = currentValue; - } - } - - // Now check predictions - for (int i = 50; i < 50 + numPredictions; i++) { - // Unclear at this point which direction the predictions will go, just verify they are - // not null, and that we don't have the_metric anymore - assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue()); - assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue()); - } - } - - /** - * This test filters the "gap" data so that the last doc is excluded. This leaves a long stretch of empty - * buckets after the first bucket. - */ - @Test - public void testRightGap() { - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(randomMetric("the_metric", GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric")) - )) - .execute().actionGet(); - - assertSearchResponse(response); - - InternalFilter filtered = response.getAggregations().get("filtered"); - assertThat(filtered, notNullValue()); - assertThat(filtered.getName(), equalTo("filtered")); - - InternalHistogram histo = filtered.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50)); - - - SimpleValue current = buckets.get(0).getAggregations().get("movavg_values"); - assertThat(current, notNullValue()); - - double lastValue = current.value(); - - double currentValue; - for (int i = 1; i < 50; i++) { - current = buckets.get(i).getAggregations().get("movavg_values"); - if (current != null) { - currentValue = current.value(); - - assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0)); - lastValue = currentValue; - } - } - } - - @Test - public void testRightGapWithPredict() { - int numPredictions = randomIntBetween(1, 10); - SearchResponse response = client() - .prepareSearch("idx").setTypes("gap_type") - .addAggregation( - filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation( - histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L) - .subAggregation(randomMetric("the_metric", GAP_FIELD)) - .subAggregation(movingAvg("movavg_values") - .window(windowSize) - .modelBuilder(randomModelBuilder()) - .gapPolicy(gapPolicy) - .setBucketsPaths("the_metric") - .predict(numPredictions)) - )) - .execute().actionGet(); - - assertSearchResponse(response); - - InternalFilter filtered = response.getAggregations().get("filtered"); - assertThat(filtered, notNullValue()); - assertThat(filtered.getName(), equalTo("filtered")); - - InternalHistogram histo = filtered.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - // If we are skipping, there will only be predictions at the very beginning and won't append any new buckets - if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) { - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50)); - } else { - assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions)); - } - - // Unlike left-gap tests, we cannot check the slope of prediction for right-gap. E.g. linear will - // converge on zero, but holt-linear may trend upwards based on the first value - // Just check for non-nullness - SimpleValue current = buckets.get(0).getAggregations().get("movavg_values"); - assertThat(current, notNullValue()); - - // If we are skipping, there will only be predictions at the very beginning and won't append any new buckets - if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) { - // Now check predictions - for (int i = 1; i < 1 + numPredictions; i++) { - // Unclear at this point which direction the predictions will go, just verify they are - // not null - assertThat(buckets.get(i).getDocCount(), equalTo(0L)); - assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue()); - } - } else { - // Otherwise we'll have some predictions at the end - for (int i = 50; i < 50 + numPredictions; i++) { - // Unclear at this point which direction the predictions will go, just verify they are - // not null - assertThat(buckets.get(i).getDocCount(), equalTo(0L)); - assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue()); - } - } - - } - @Test public void testHoltWintersNotEnoughData() { try { @@ -1288,8 +942,7 @@ public void testTwoMovAvgsWithPredictions() { assertThat(avgAgg.value(), equalTo(10d)); SimpleValue movAvgAgg = bucket.getAggregations().get("avg_movavg"); - assertThat(movAvgAgg, notNullValue()); - assertThat(movAvgAgg.value(), equalTo(10d)); + assertThat(movAvgAgg, nullValue()); Derivative deriv = bucket.getAggregations().get("deriv"); assertThat(deriv, nullValue()); @@ -1297,7 +950,28 @@ public void testTwoMovAvgsWithPredictions() { SimpleValue derivMovAvg = bucket.getAggregations().get("deriv_movavg"); assertThat(derivMovAvg, nullValue()); - for (int i = 1; i < 12; i++) { + // Second bucket + bucket = buckets.get(1); + assertThat(bucket, notNullValue()); + assertThat((long) bucket.getKey(), equalTo(1L)); + assertThat(bucket.getDocCount(), equalTo(1l)); + + avgAgg = bucket.getAggregations().get("avg"); + assertThat(avgAgg, notNullValue()); + assertThat(avgAgg.value(), equalTo(10d)); + + deriv = bucket.getAggregations().get("deriv"); + assertThat(deriv, notNullValue()); + assertThat(deriv.value(), equalTo(0d)); + + movAvgAgg = bucket.getAggregations().get("avg_movavg"); + assertThat(movAvgAgg, notNullValue()); + assertThat(movAvgAgg.value(), equalTo(10d)); + + derivMovAvg = bucket.getAggregations().get("deriv_movavg"); + assertThat(derivMovAvg, Matchers.nullValue()); // still null because of movavg delay + + for (int i = 2; i < 12; i++) { bucket = buckets.get(i); assertThat(bucket, notNullValue()); assertThat((long) bucket.getKey(), equalTo((long) i)); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java index 8a94a04ad411d..1e6449eaac1c8 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java @@ -21,7 +21,6 @@ import com.google.common.collect.EvictingQueue; -import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.aggregations.pipeline.movavg.models.*; import org.elasticsearch.test.ElasticsearchTestCase; @@ -47,7 +46,10 @@ public void testSimpleMovAvgModel() { double randValue = randomDouble(); double expected = 0; - window.offer(randValue); + if (i == 0) { + window.offer(randValue); + continue; + } for (double value : window) { expected += value; @@ -56,6 +58,7 @@ public void testSimpleMovAvgModel() { double actual = model.next(window); assertThat(Double.compare(expected, actual), equalTo(0)); + window.offer(randValue); } } @@ -64,7 +67,7 @@ public void testSimplePredictionModel() { MovAvgModel model = new SimpleModel(); int windowSize = randomIntBetween(1, 50); - int numPredictions = randomIntBetween(1,50); + int numPredictions = randomIntBetween(1, 50); EvictingQueue window = EvictingQueue.create(windowSize); for (int i = 0; i < windowSize; i++) { @@ -73,13 +76,12 @@ public void testSimplePredictionModel() { double actual[] = model.predict(window, numPredictions); double expected[] = new double[numPredictions]; - for (int i = 0; i < numPredictions; i++) { - for (double value : window) { - expected[i] += value; - } - expected[i] /= window.size(); - window.offer(expected[i]); + double t = 0; + for (double value : window) { + t += value; } + t /= window.size(); + Arrays.fill(expected, t); for (int i = 0; i < numPredictions; i++) { assertThat(Double.compare(expected[i], actual[i]), equalTo(0)); @@ -96,7 +98,11 @@ public void testLinearMovAvgModel() { EvictingQueue window = EvictingQueue.create(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); - window.offer(randValue); + + if (i == 0) { + window.offer(randValue); + continue; + } double avg = 0; long totalWeight = 1; @@ -110,6 +116,7 @@ public void testLinearMovAvgModel() { double expected = avg / totalWeight; double actual = model.next(window); assertThat(Double.compare(expected, actual), equalTo(0)); + window.offer(randValue); } } @@ -127,19 +134,17 @@ public void testLinearPredictionModel() { double actual[] = model.predict(window, numPredictions); double expected[] = new double[numPredictions]; - for (int i = 0; i < numPredictions; i++) { - double avg = 0; - long totalWeight = 1; - long current = 1; + double avg = 0; + long totalWeight = 1; + long current = 1; - for (double value : window) { - avg += value * current; - totalWeight += current; - current += 1; - } - expected[i] = avg / totalWeight; - window.offer(expected[i]); + for (double value : window) { + avg += value * current; + totalWeight += current; + current += 1; } + avg = avg / totalWeight; + Arrays.fill(expected, avg); for (int i = 0; i < numPredictions; i++) { assertThat(Double.compare(expected[i], actual[i]), equalTo(0)); @@ -157,7 +162,11 @@ public void testEWMAMovAvgModel() { EvictingQueue window = EvictingQueue.create(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); - window.offer(randValue); + + if (i == 0) { + window.offer(randValue); + continue; + } double avg = 0; boolean first = true; @@ -173,6 +182,7 @@ public void testEWMAMovAvgModel() { double expected = avg; double actual = model.next(window); assertThat(Double.compare(expected, actual), equalTo(0)); + window.offer(randValue); } } @@ -191,21 +201,18 @@ public void testEWMAPredictionModel() { double actual[] = model.predict(window, numPredictions); double expected[] = new double[numPredictions]; - for (int i = 0; i < numPredictions; i++) { - double avg = 0; - boolean first = true; + double avg = 0; + boolean first = true; - for (double value : window) { - if (first) { - avg = value; - first = false; - } else { - avg = (value * alpha) + (avg * (1 - alpha)); - } + for (double value : window) { + if (first) { + avg = value; + first = false; + } else { + avg = (value * alpha) + (avg * (1 - alpha)); } - expected[i] = avg; - window.offer(expected[i]); } + Arrays.fill(expected, avg); for (int i = 0; i < numPredictions; i++) { assertThat(Double.compare(expected[i], actual[i]), equalTo(0)); @@ -224,7 +231,11 @@ public void testHoltLinearMovAvgModel() { EvictingQueue window = EvictingQueue.create(windowSize); for (int i = 0; i < numValues; i++) { double randValue = randomDouble(); - window.offer(randValue); + + if (i == 0) { + window.offer(randValue); + continue; + } double s = 0; double last_s = 0; @@ -253,6 +264,7 @@ public void testHoltLinearMovAvgModel() { double expected = s + (0 * b) ; double actual = model.next(window); assertThat(Double.compare(expected, actual), equalTo(0)); + window.offer(randValue); } }