Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix Graphite moving functions to include series not in the query but matched by expanded range #3149

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func SafeSort(input []float64) int {
nans++
}
}

sort.Float64s(input)
return nans
}
Expand All @@ -79,12 +78,19 @@ func SafeSum(input []float64) (float64, int) {
if !math.IsNaN(v) {
sum += v
} else {
nans += 1
nans++
}
}
return sum, nans
}

// SafeAverage returns the average of the input slice the number of NaNs in the input.
func SafeAverage(input []float64) (float64, int) {
sum, nans := SafeSum(input)
count := len(input) - nans
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can they all be NaNs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm good call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is actually safe:
https://play.golang.org/p/MhMnzAReOd1

return sum / float64(count), nans
}

// SafeMax returns the maximum value of the input slice the number of NaNs in the input.
func SafeMax(input []float64) (float64, int) {
nans := 0
Expand Down
17 changes: 14 additions & 3 deletions src/query/graphite/common/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type MovingFunctionStorage struct {
StepMillis int
Bootstrap []float64
Values []float64
OriginalIDs []string
BootstrapIDs []string
BootstrapStart time.Time
}

Expand Down Expand Up @@ -155,12 +157,21 @@ func (s *MovingFunctionStorage) fetchByIDs(
var values []float64
if opts.StartTime.Equal(s.BootstrapStart) {
values = s.Bootstrap
if s.BootstrapIDs != nil {
ids = s.BootstrapIDs
}
} else {
values = s.Values
if s.OriginalIDs != nil {
ids = s.OriginalIDs
}
}

for _, id := range ids {
series := ts.NewSeries(ctx, id, opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}
series := ts.NewSeries(ctx, ids[0], opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}

return storage.NewFetchResult(ctx, seriesList, block.NewResultMetadata()), nil
Expand Down
225 changes: 107 additions & 118 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,108 +848,6 @@ func parseWindowSize(windowSizeValue genericInterface, input singlePathSpec) (wi
return windowSize, nil
}

// movingAverage calculates the moving average of a metric (or metrics) over a time interval.
func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
}

widowSize, err := parseWindowSize(windowSizeValue, input)
if err != nil {
return nil, err
}

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, widowSize.deltaValue, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-widowSize.deltaValue), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
stepSize := series.MillisPerStep()
windowPoints := widowSize.windowSizeFunc(stepSize)
if windowPoints == 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"windowSize should not be smaller than stepSize, windowSize=%v, stepSize=%d",
windowSizeValue, stepSize))
return ts.NewSeriesList(), err
}

numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
sum := 0.0
num := 0
nans := 0
firstPoint := false
for i := 0; i < numSteps; i++ {
// NB: skip if the number of points received is less than the number
// of points in the lookback window.
if !firstPoint {
firstPoint = true
for j := offset - windowPoints; j < offset; j++ {
if j < 0 {
continue
}

v := bootstrap.ValueAt(j)
if !math.IsNaN(v) {
sum += v
num++
} else {
nans++
}
}
} else {
if i+offset-windowPoints > 0 {
prev := bootstrap.ValueAt(i + offset - windowPoints - 1)
if !math.IsNaN(prev) {
sum -= prev
num--
} else {
nans--
}
}
next := bootstrap.ValueAt(i + offset - 1)
if !math.IsNaN(next) {
sum += next
num++
} else {
nans++
}
}

if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) {
vals.SetValueAt(i, sum/float64(num))
}
}
name := fmt.Sprintf("movingAverage(%s,%s)", series.Name(), widowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
}, nil
}

// exponentialMovingAverage takes a series of values and a window size and produces
// an exponential moving average utilizing the following formula:
// ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
Expand Down Expand Up @@ -1632,8 +1530,8 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S
// combineBootstrapWithOriginal combines the bootstrapped the series with the original series.
func combineBootstrapWithOriginal(
ctx *common.Context,
startTime time.Time,
endTime time.Time,
bootstrapStartTime, bootstrapEndTime time.Time,
_, originalEndTime time.Time,
bootstrapped ts.SeriesList,
seriesList singlePathSpec,
) (ts.SeriesList, error) {
Expand All @@ -1646,15 +1544,20 @@ func combineBootstrapWithOriginal(
for _, series := range seriesList.Values {
bs, found := nameToSeries[series.Name()]
if !found {
numSteps := ts.NumSteps(startTime, endTime, series.MillisPerStep())
numSteps := ts.NumSteps(bootstrapStartTime, bootstrapEndTime, series.MillisPerStep())
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
bs = ts.NewSeries(ctx, series.Name(), startTime, vals)
bs = ts.NewSeries(ctx, series.Name(), bootstrapStartTime, vals)
} else {
// Delete from the lookup so we can fill in
// the bootstrapped time series with NaNs if
// the original series list is missing the series.
delete(nameToSeries, series.Name())
}
bootstrapList = append(bootstrapList, bs)
}

var err error
newSeriesList := make([]*ts.Series, len(seriesList.Values))
newSeriesList := make([]*ts.Series, 0, len(seriesList.Values)+len(nameToSeries))
for i, bootstrap := range bootstrapList {
original := seriesList.Values[i]
if bootstrap.MillisPerStep() < original.MillisPerStep() {
Expand All @@ -1663,8 +1566,8 @@ func combineBootstrapWithOriginal(
return ts.NewSeriesList(), err
}
}
bootstrapEndStep := endTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(endTime) {
bootstrapEndStep := bootstrapEndTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(bootstrapEndTime) {
bootstrapEndStep = bootstrapEndStep.Add(original.Resolution())
}
// NB(braskin): using bootstrap.Len() is incorrect as it will include all
Expand All @@ -1682,9 +1585,32 @@ func combineBootstrapWithOriginal(
for j := numBootstrapValues; j < numCombinedValues; j++ {
values.SetValueAt(j, original.ValueAt(j-numBootstrapValues))
}
newSeries := ts.NewSeries(ctx, original.Name(), startTime, values)
newSeries := ts.NewSeries(ctx, original.Name(), bootstrapStartTime, values)
newSeries.Specification = original.Specification
newSeriesList[i] = newSeries
newSeriesList = append(newSeriesList, newSeries)
}
// Now add any series in bootstrap list but not original series,
// need to iterate the bootstrapped.Values to retain order
// but can check if they are in the nameToSeries map still and if so
// then there was no matching original series.
for _, series := range bootstrapped.Values {
bs, found := nameToSeries[series.Name()]
if !found {
// Processed already.
continue
}
// Extend the bootstrap series to include steps covered by original
// time range since the original series is missing from fetch.
needSteps := bs.StepAtTime(originalEndTime)
if currSteps := bs.Len(); needSteps > currSteps {
// Need to resize.
vals := ts.NewValues(ctx, bs.MillisPerStep(), needSteps)
for i := 0; i < currSteps; i++ {
vals.SetValueAt(i, bs.ValueAt(i))
}
bs = bs.DerivedSeries(bs.StartTime(), vals)
}
newSeriesList = append(newSeriesList, bs)
}

r := ts.SeriesList(seriesList)
Expand Down Expand Up @@ -2134,6 +2060,15 @@ func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}

// movingAverageHelper given a slice of floats, calculates the average and assigns it into vals as index i
func movingAverageHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
avg, nans := common.SafeAverage(window)

if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) {
vals.SetValueAt(i, avg)
}
}

// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i
func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
max, nans := common.SafeMax(window)
Expand Down Expand Up @@ -2181,12 +2116,14 @@ func newMovingBinaryTransform(
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
originalStart, originalEnd := ctx.StartTime, ctx.EndTime
bootstrapStartTime, bootstrapEndTime := originalStart.Add(-interval), originalStart
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
ctx.StartTime, ctx.EndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -2195,7 +2132,15 @@ func newMovingBinaryTransform(
results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that resolution.
series = original.Values[i]
} else {
// No existing series use resolution from bootstrapped.
series = bootstrapList.Values[i]
}

windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
Expand All @@ -2210,7 +2155,20 @@ func newMovingBinaryTransform(

windowPoints := make([]float64, maxWindowPoints)
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that.
series = original.Values[i]
} else {
// No existing series, use an intersected
// version of the bootstrapped series as a
// reference for values to compute.
series, err = bootstrap.IntersectAndResize(originalStart,
originalEnd, bootstrap.MillisPerStep(), bootstrap.ConsolidationFunc())
if err != nil {
return ts.NewSeriesList(), err
}
}
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())
Expand Down Expand Up @@ -2253,25 +2211,56 @@ func newMovingBinaryTransform(
}

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMedian(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor,
movingImplementationFn(movingMedianHelper))
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingSum(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor,
movingImplementationFn(movingSumHelper))
}

// movingAverage calculates the moving average of a metric (or metrics) over a time interval.
func movingAverage(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingAverage", xFilesFactor,
movingImplementationFn(movingAverageHelper))
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMax(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor,
movingImplementationFn(movingMaxHelper))
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMin(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor,
movingImplementationFn(movingMinHelper))
}
Expand Down
Loading