Skip to content

Commit

Permalink
Merge pull request #11641 from polyfractal/bugfix/movavg_predict
Browse files Browse the repository at this point in the history
Aggregations: Moving average forecasts should not include current point
  • Loading branch information
polyfractal committed Jun 22, 2015
2 parents eb23530 + 5d94feb commit 2ebb44d
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 447 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
InternalHistogram.Bucket newBucket = bucket;

if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
values.offer(thisBucketValue);

// Some models (e.g. HoltWinters) have certain preconditions that must be met
if (model.hasValue(values.size())) {
Expand All @@ -142,6 +141,8 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
}
lastValidPosition = counter;
}

values.offer(thisBucketValue);
}
counter += 1;
newBuckets.add(newBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -50,6 +51,15 @@ public EwmaModel(double alpha) {
this.alpha = alpha;
}

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public HoltLinearModel(double alpha, double beta) {
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
return next(values, numPredictions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean hasValue(int windowLength) {
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
return next(values, numPredictions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -41,6 +42,16 @@ public class LinearModel extends MovAvgModel {

protected static final ParseField NAME_FIELD = new ParseField("linear");

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.search.aggregations.pipeline.movavg.models;

import com.google.common.collect.EvictingQueue;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchParseException;
Expand All @@ -44,7 +42,7 @@ public abstract class MovAvgModel {
*/
public boolean hasValue(int windowLength) {
// Default implementation can always provide a next() value
return true;
return windowLength > 0;
}

/**
Expand All @@ -57,44 +55,39 @@ public boolean hasValue(int windowLength) {
public abstract <T extends Number> double next(Collection<T> values);

/**
* Predicts the next `n` values in the series, using the smoothing model to generate new values.
* Default prediction mode is to simply continuing calling <code>next()</code> and adding the
* predicted value back into the windowed buffer.
* Predicts the next `n` values in the series.
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];
assert(numPredictions >= 1);

// If there are no values, we can't do anything. Return an array of NaNs.
if (values.size() == 0) {
if (values.isEmpty()) {
return emptyPredictions(numPredictions);
}

// special case for one prediction, avoids allocation
if (numPredictions < 1) {
throw new IllegalArgumentException("numPredictions may not be less than 1.");
} else if (numPredictions == 1){
predictions[0] = next(values);
return predictions;
}

Collection<Number> predictionBuffer = EvictingQueue.create(values.size());
predictionBuffer.addAll(values);

for (int i = 0; i < numPredictions; i++) {
predictions[i] = next(predictionBuffer);

// Add the last value to the buffer, so we can keep predicting
predictionBuffer.add(predictions[i]);
}

return predictions;
return doPredict(values, numPredictions);
}

/**
* Calls to the model-specific implementation which actually generates the predictions
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
protected abstract <T extends Number> double[] doPredict(Collection<T> values, int numPredictions);

/**
* Returns an empty set of predictions, filled with NaNs
* @param numPredictions
* @return
*/
protected double[] emptyPredictions(int numPredictions) {
double[] predictions = new double[numPredictions];
Arrays.fill(predictions, Double.NaN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

Expand All @@ -39,6 +40,16 @@ public class SimpleModel extends MovAvgModel {

protected static final ParseField NAME_FIELD = new ParseField("simple");

@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions];

// EWMA just emits the same final prediction repeatedly.
Arrays.fill(predictions, next(values));

return predictions;
}

@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static ArrayList<MockBucket> generateHistogram(int interval, int size, do
ArrayList<MockBucket> values = new ArrayList<>(size);

boolean lastWasGap = false;
boolean emptyHisto = true;

for (int i = 0; i < size; i++) {
MockBucket bucket = new MockBucket();
Expand All @@ -70,15 +71,27 @@ public static ArrayList<MockBucket> generateHistogram(int interval, int size, do
bucket.count = randomIntBetween(1, 50);
bucket.docValues = new double[bucket.count];
for (int j = 0; j < bucket.count; j++) {
bucket.docValues[j] = randomDouble() * randomIntBetween(-20,20);
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
}
lastWasGap = false;
emptyHisto = false;
}

bucket.key = i * interval;
values.add(bucket);
}

if (emptyHisto) {
int idx = randomIntBetween(0, values.size()-1);
MockBucket bucket = values.get(idx);
bucket.count = randomIntBetween(1, 50);
bucket.docValues = new double[bucket.count];
for (int j = 0; j < bucket.count; j++) {
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
}
values.set(idx, bucket);
}

return values;
}

Expand Down
Loading

0 comments on commit 2ebb44d

Please sign in to comment.