Skip to content

Commit

Permalink
Aggregations: Moving average forecasts should not include current dat…
Browse files Browse the repository at this point in the history
…apoint.

- Fixes tests, and removes a few special snowflake, fragile tests.
- Removes concrete implementation of predict() and moves it into
  each model so that the logic is clearer.  Because there is some
  shared checks/assertions, those remain in predict() and the main
  prediction happens in doPredict()
  • Loading branch information
polyfractal committed Jun 22, 2015
1 parent eb23530 commit 5d94feb
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
InternalHistogram.Bucket newBucket = bucket;

if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
values.offer(thisBucketValue);

// Some models (e.g. HoltWinters) have certain preconditions that must be met
if (model.hasValue(values.size())) {
Expand All @@ -142,6 +141,8 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
}
lastValidPosition = counter;
}

values.offer(thisBucketValue);
}
counter += 1;
newBuckets.add(newBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -50,6 +51,15 @@ public EwmaModel(double alpha) {
this.alpha = alpha;
}

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public HoltLinearModel(double alpha, double beta) {
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
return next(values, numPredictions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean hasValue(int windowLength) {
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
return next(values, numPredictions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -41,6 +42,16 @@ public class LinearModel extends MovAvgModel {

protected static final ParseField NAME_FIELD = new ParseField("linear");

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.search.aggregations.pipeline.movavg.models;

import com.google.common.collect.EvictingQueue;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchParseException;
Expand All @@ -44,7 +42,7 @@ public abstract class MovAvgModel {
*/
public boolean hasValue(int windowLength) {
// Default implementation can always provide a next() value
return true;
return windowLength > 0;
}

/**
Expand All @@ -57,44 +55,39 @@ public boolean hasValue(int windowLength) {
public abstract <T extends Number> double next(Collection<T> values);

/**
* Predicts the next `n` values in the series, using the smoothing model to generate new values.
* Default prediction mode is to simply continuing calling <code>next()</code> and adding the
* predicted value back into the windowed buffer.
* Predicts the next `n` values in the series.
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];
assert(numPredictions >= 1);

// If there are no values, we can't do anything. Return an array of NaNs.
if (values.size() == 0) {
if (values.isEmpty()) {
return emptyPredictions(numPredictions);
}

// special case for one prediction, avoids allocation
if (numPredictions < 1) {
throw new IllegalArgumentException("numPredictions may not be less than 1.");
} else if (numPredictions == 1){
predictions[0] = next(values);
return predictions;
}

Collection<Number> predictionBuffer = EvictingQueue.create(values.size());
predictionBuffer.addAll(values);

for (int i = 0; i < numPredictions; i++) {
predictions[i] = next(predictionBuffer);

// Add the last value to the buffer, so we can keep predicting
predictionBuffer.add(predictions[i]);
}

return predictions;
return doPredict(values, numPredictions);
}

/**
* Calls to the model-specific implementation which actually generates the predictions
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
protected abstract <T extends Number> double[] doPredict(Collection<T> values, int numPredictions);

/**
* Returns an empty set of predictions, filled with NaNs
* @param numPredictions
* @return
*/
protected double[] emptyPredictions(int numPredictions) {
double[] predictions = new double[numPredictions];
Arrays.fill(predictions, Double.NaN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -39,6 +40,16 @@ public class SimpleModel extends MovAvgModel {

protected static final ParseField NAME_FIELD = new ParseField("simple");

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static ArrayList<MockBucket> generateHistogram(int interval, int size, do
ArrayList<MockBucket> values = new ArrayList<>(size);

boolean lastWasGap = false;
boolean emptyHisto = true;

for (int i = 0; i < size; i++) {
MockBucket bucket = new MockBucket();
Expand All @@ -70,15 +71,27 @@ public static ArrayList<MockBucket> generateHistogram(int interval, int size, do
bucket.count = randomIntBetween(1, 50);
bucket.docValues = new double[bucket.count];
for (int j = 0; j < bucket.count; j++) {
bucket.docValues[j] = randomDouble() * randomIntBetween(-20,20);
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
}
lastWasGap = false;
emptyHisto = false;
}

bucket.key = i * interval;
values.add(bucket);
}

if (emptyHisto) {
int idx = randomIntBetween(0, values.size()-1);
MockBucket bucket = values.get(idx);
bucket.count = randomIntBetween(1, 50);
bucket.docValues = new double[bucket.count];
for (int j = 0; j < bucket.count; j++) {
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
}
values.set(idx, bucket);
}

return values;
}

Expand Down
Loading

0 comments on commit 5d94feb

Please sign in to comment.