Skip to content

Commit

Permalink
[query] Fix Graphite moving functions to union expanded/shifted fetch…
Browse files Browse the repository at this point in the history
…ed data with original query fetched time window (#3149)
  • Loading branch information
robskillington authored Mar 18, 2021
1 parent 6b91769 commit 8df132f
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 124 deletions.
10 changes: 8 additions & 2 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func SafeSort(input []float64) int {
nans++
}
}

sort.Float64s(input)
return nans
}
Expand All @@ -79,12 +78,19 @@ func SafeSum(input []float64) (float64, int) {
if !math.IsNaN(v) {
sum += v
} else {
nans += 1
nans++
}
}
return sum, nans
}

// SafeAverage returns the average of the input slice the number of NaNs in the input.
func SafeAverage(input []float64) (float64, int) {
sum, nans := SafeSum(input)
count := len(input) - nans
return sum / float64(count), nans
}

// SafeMax returns the maximum value of the input slice the number of NaNs in the input.
func SafeMax(input []float64) (float64, int) {
nans := 0
Expand Down
17 changes: 14 additions & 3 deletions src/query/graphite/common/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type MovingFunctionStorage struct {
StepMillis int
Bootstrap []float64
Values []float64
OriginalIDs []string
BootstrapIDs []string
BootstrapStart time.Time
}

Expand Down Expand Up @@ -155,12 +157,21 @@ func (s *MovingFunctionStorage) fetchByIDs(
var values []float64
if opts.StartTime.Equal(s.BootstrapStart) {
values = s.Bootstrap
if s.BootstrapIDs != nil {
ids = s.BootstrapIDs
}
} else {
values = s.Values
if s.OriginalIDs != nil {
ids = s.OriginalIDs
}
}

for _, id := range ids {
series := ts.NewSeries(ctx, id, opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}
series := ts.NewSeries(ctx, ids[0], opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}

return storage.NewFetchResult(ctx, seriesList, block.NewResultMetadata()), nil
Expand Down
225 changes: 107 additions & 118 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,108 +848,6 @@ func parseWindowSize(windowSizeValue genericInterface, input singlePathSpec) (wi
return windowSize, nil
}

// movingAverage calculates the moving average of a metric (or metrics) over a time interval.
func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
}

widowSize, err := parseWindowSize(windowSizeValue, input)
if err != nil {
return nil, err
}

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, widowSize.deltaValue, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-widowSize.deltaValue), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
stepSize := series.MillisPerStep()
windowPoints := widowSize.windowSizeFunc(stepSize)
if windowPoints == 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"windowSize should not be smaller than stepSize, windowSize=%v, stepSize=%d",
windowSizeValue, stepSize))
return ts.NewSeriesList(), err
}

numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
sum := 0.0
num := 0
nans := 0
firstPoint := false
for i := 0; i < numSteps; i++ {
// NB: skip if the number of points received is less than the number
// of points in the lookback window.
if !firstPoint {
firstPoint = true
for j := offset - windowPoints; j < offset; j++ {
if j < 0 {
continue
}

v := bootstrap.ValueAt(j)
if !math.IsNaN(v) {
sum += v
num++
} else {
nans++
}
}
} else {
if i+offset-windowPoints > 0 {
prev := bootstrap.ValueAt(i + offset - windowPoints - 1)
if !math.IsNaN(prev) {
sum -= prev
num--
} else {
nans--
}
}
next := bootstrap.ValueAt(i + offset - 1)
if !math.IsNaN(next) {
sum += next
num++
} else {
nans++
}
}

if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) {
vals.SetValueAt(i, sum/float64(num))
}
}
name := fmt.Sprintf("movingAverage(%s,%s)", series.Name(), widowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
}, nil
}

// exponentialMovingAverage takes a series of values and a window size and produces
// an exponential moving average utilizing the following formula:
// ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
Expand Down Expand Up @@ -1632,8 +1530,8 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S
// combineBootstrapWithOriginal combines the bootstrapped the series with the original series.
func combineBootstrapWithOriginal(
ctx *common.Context,
startTime time.Time,
endTime time.Time,
bootstrapStartTime, bootstrapEndTime time.Time,
_, originalEndTime time.Time,
bootstrapped ts.SeriesList,
seriesList singlePathSpec,
) (ts.SeriesList, error) {
Expand All @@ -1646,15 +1544,20 @@ func combineBootstrapWithOriginal(
for _, series := range seriesList.Values {
bs, found := nameToSeries[series.Name()]
if !found {
numSteps := ts.NumSteps(startTime, endTime, series.MillisPerStep())
numSteps := ts.NumSteps(bootstrapStartTime, bootstrapEndTime, series.MillisPerStep())
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
bs = ts.NewSeries(ctx, series.Name(), startTime, vals)
bs = ts.NewSeries(ctx, series.Name(), bootstrapStartTime, vals)
} else {
// Delete from the lookup so we can fill in
// the bootstrapped time series with NaNs if
// the original series list is missing the series.
delete(nameToSeries, series.Name())
}
bootstrapList = append(bootstrapList, bs)
}

var err error
newSeriesList := make([]*ts.Series, len(seriesList.Values))
newSeriesList := make([]*ts.Series, 0, len(seriesList.Values)+len(nameToSeries))
for i, bootstrap := range bootstrapList {
original := seriesList.Values[i]
if bootstrap.MillisPerStep() < original.MillisPerStep() {
Expand All @@ -1663,8 +1566,8 @@ func combineBootstrapWithOriginal(
return ts.NewSeriesList(), err
}
}
bootstrapEndStep := endTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(endTime) {
bootstrapEndStep := bootstrapEndTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(bootstrapEndTime) {
bootstrapEndStep = bootstrapEndStep.Add(original.Resolution())
}
// NB(braskin): using bootstrap.Len() is incorrect as it will include all
Expand All @@ -1682,9 +1585,32 @@ func combineBootstrapWithOriginal(
for j := numBootstrapValues; j < numCombinedValues; j++ {
values.SetValueAt(j, original.ValueAt(j-numBootstrapValues))
}
newSeries := ts.NewSeries(ctx, original.Name(), startTime, values)
newSeries := ts.NewSeries(ctx, original.Name(), bootstrapStartTime, values)
newSeries.Specification = original.Specification
newSeriesList[i] = newSeries
newSeriesList = append(newSeriesList, newSeries)
}
// Now add any series in bootstrap list but not original series,
// need to iterate the bootstrapped.Values to retain order
// but can check if they are in the nameToSeries map still and if so
// then there was no matching original series.
for _, series := range bootstrapped.Values {
bs, found := nameToSeries[series.Name()]
if !found {
// Processed already.
continue
}
// Extend the bootstrap series to include steps covered by original
// time range since the original series is missing from fetch.
needSteps := bs.StepAtTime(originalEndTime)
if currSteps := bs.Len(); needSteps > currSteps {
// Need to resize.
vals := ts.NewValues(ctx, bs.MillisPerStep(), needSteps)
for i := 0; i < currSteps; i++ {
vals.SetValueAt(i, bs.ValueAt(i))
}
bs = bs.DerivedSeries(bs.StartTime(), vals)
}
newSeriesList = append(newSeriesList, bs)
}

r := ts.SeriesList(seriesList)
Expand Down Expand Up @@ -2134,6 +2060,15 @@ func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}

// movingAverageHelper given a slice of floats, calculates the average and assigns it into vals as index i
func movingAverageHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
avg, nans := common.SafeAverage(window)

if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) {
vals.SetValueAt(i, avg)
}
}

// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i
func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
max, nans := common.SafeMax(window)
Expand Down Expand Up @@ -2181,12 +2116,14 @@ func newMovingBinaryTransform(
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
originalStart, originalEnd := ctx.StartTime, ctx.EndTime
bootstrapStartTime, bootstrapEndTime := originalStart.Add(-interval), originalStart
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
ctx.StartTime, ctx.EndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -2195,7 +2132,15 @@ func newMovingBinaryTransform(
results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that resolution.
series = original.Values[i]
} else {
// No existing series use resolution from bootstrapped.
series = bootstrapList.Values[i]
}

windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
Expand All @@ -2210,7 +2155,20 @@ func newMovingBinaryTransform(

windowPoints := make([]float64, maxWindowPoints)
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that.
series = original.Values[i]
} else {
// No existing series, use an intersected
// version of the bootstrapped series as a
// reference for values to compute.
series, err = bootstrap.IntersectAndResize(originalStart,
originalEnd, bootstrap.MillisPerStep(), bootstrap.ConsolidationFunc())
if err != nil {
return ts.NewSeriesList(), err
}
}
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())
Expand Down Expand Up @@ -2253,25 +2211,56 @@ func newMovingBinaryTransform(
}

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMedian(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor,
movingImplementationFn(movingMedianHelper))
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingSum(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor,
movingImplementationFn(movingSumHelper))
}

// movingAverage calculates the moving average of a metric (or metrics) over a time interval.
func movingAverage(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingAverage", xFilesFactor,
movingImplementationFn(movingAverageHelper))
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMax(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor,
movingImplementationFn(movingMaxHelper))
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMin(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor,
movingImplementationFn(movingMinHelper))
}
Expand Down
Loading

0 comments on commit 8df132f

Please sign in to comment.