Skip to content

Commit

Permalink
[query] Implemented the Graphite highest and lowest functions (#2623
Browse files Browse the repository at this point in the history
)
  • Loading branch information
teddywahle authored Sep 23, 2020
1 parent b4575f6 commit 4d81e4a
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 28 deletions.
74 changes: 53 additions & 21 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,36 +562,52 @@ func takeByFunction(input singlePathSpec, n int, sr ts.SeriesReducer, sort ts.Di
return common.Head(r, n)
}

func getReducer(f string) (ts.SeriesReducer, error) {
sa := ts.SeriesReducerApproach(f)
r, ok := sa.SafeReducer()
if !ok {
return r, errors.NewInvalidParamsError(fmt.Errorf("invalid function %s", f))
}
return r, nil
}

// highest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the highest
// aggregated value over the time period specified.
func highest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Descending)
}

// highestSum takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// total value in the time period specified.
func highestSum(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerSum.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestSum(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "sum")
}

// highestMax takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// maximum value in the time period specified.
func highestMax(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerMax.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestMax(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "max")
}

// highestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// highest value at the end of the time period specified.
func highestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "current")
}

// highestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// highest average value for the time period specified.
func highestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "average")
}

// fallbackSeries takes one metric or a wildcard seriesList, and a second fallback metric.
Expand All @@ -608,25 +624,33 @@ func fallbackSeries(_ *common.Context, input singlePathSpec, fallback singlePath
// N. Draws the N most deviant metrics. To find the deviants, the standard
// deviation (sigma) of each series is taken and ranked. The top N standard
// deviations are returned.
func mostDeviant(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerStdDev.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func mostDeviant(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "stddev")
}

// lowest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the lowest
// aggregated value over the time period specified.
func lowest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Ascending)
}

// lowestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// lowest average value for the time period specified.
func lowestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "average")
}

// lowestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// lowest value at the end of the time period specified.
func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "current")
}

// windowSizeFunc calculates window size for moving average calculation
Expand Down Expand Up @@ -2173,6 +2197,10 @@ func init() {
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand All @@ -2192,6 +2220,10 @@ func init() {
MustRegisterFunction(logarithm).WithDefaultParams(map[uint8]interface{}{
2: 10, // base
})
MustRegisterFunction(lowest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(lowestAverage)
MustRegisterFunction(lowestCurrent)
MustRegisterFunction(maxSeries)
Expand Down
120 changes: 120 additions & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,10 +1090,20 @@ type nIntParamGoldenData struct {
outputs []common.TestSeries
}

// nIntParamGoldenDataWithAgg holds test data for functions that take an additional "n" int parameter
// It also holds an aggregation function
type nIntParamGoldenDataWithAgg struct {
nIntParamGoldenData
aggFunc string
}

// rankingFunc selects the n lowest or highest series based on certain metric of the
// series (e.g., maximum, minimum, average).
type rankingFunc func(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error)

// testRanking can be used to test the ranking alias functions
// (e.g. lowestAverage, highestMax, highestAverage, lowestCurrent)
// these functions are all aliases of the "meta-ranking" functions (i.e. highest and lowest)
func testRanking(t *testing.T, ctx *common.Context, tests []nIntParamGoldenData, f rankingFunc) {
start := time.Now()
step := 100
Expand All @@ -1113,6 +1123,73 @@ func testRanking(t *testing.T, ctx *common.Context, tests []nIntParamGoldenData,
}
}

// testOrderedAggregationFunc is a helper function for testing lowest and highest
func testOrderedAggregationFunc(t *testing.T, ctx *common.Context, tests []nIntParamGoldenDataWithAgg, isLowest bool) {
f := highest
if isLowest {
f = lowest
}

start := time.Now()
step := 100
for _, test := range tests {
input := singlePathSpec{Values: generateSeriesList(ctx, start, test.inputs, step)}
outputs, err := f(ctx, input, test.n, test.aggFunc)

if test.n < 0 {
require.NotNil(t, err)
require.Equal(t, "n must be positive", err.Error())
assert.Nil(t, outputs.Values, "Nil timeseries should be returned")
continue
}

require.NoError(t, err)
common.CompareOutputsAndExpected(t, step, start,
test.outputs, outputs.Values)
}
}

func TestHighest(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

tests := []nIntParamGoldenDataWithAgg{
{
nIntParamGoldenData{
testInput,
0,
nil,
},
"sum",
},
{
nIntParamGoldenData{
testInput,
1,
[]common.TestSeries{testInput[0]},
},
"current",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[4], testInput[2]},
},
"average",
},
{
nIntParamGoldenData{
testInput,
len(testInput) + 10, // force sort
[]common.TestSeries{testInput[0], testInput[3], testInput[4], testInput[2], testInput[1]},
},
"last",
},
}
testOrderedAggregationFunc(t, ctx, tests, false)
}

func TestHighestCurrent(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -1283,6 +1360,47 @@ func TestMostDeviant(t *testing.T) {
testRanking(t, ctx, tests, mostDeviant)
}

func TestLowest(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

tests := []nIntParamGoldenDataWithAgg{
{
nIntParamGoldenData{
testInput,
0,
nil,
},
"max",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[1], testInput[3]},
},
"sum",
},
{
nIntParamGoldenData{
testInput,
2,
[]common.TestSeries{testInput[1], testInput[2]},
},
"current",
},
{
nIntParamGoldenData{
testInput,
3,
[]common.TestSeries{testInput[1], testInput[3], testInput[0]},
},
"average",
},
}
testOrderedAggregationFunc(t, ctx, tests, true)
}

func TestLowestAverage(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -3159,6 +3277,7 @@ func TestFunctionsRegistered(t *testing.T) {
"group",
"groupByNode",
"groupByNodes",
"highest",
"highestAverage",
"highestCurrent",
"highestMax",
Expand All @@ -3175,6 +3294,7 @@ func TestFunctionsRegistered(t *testing.T) {
"limit",
"log",
"logarithm",
"lowest",
"lowestAverage",
"lowestCurrent",
"max",
Expand Down
21 changes: 14 additions & 7 deletions src/query/graphite/ts/series_reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ type SeriesReducerApproach string
// The standard set of reducers
const (
SeriesReducerAvg SeriesReducerApproach = "avg"
SeriesReducerSum SeriesReducerApproach = "total"
SeriesReducerSum SeriesReducerApproach = "sum"
SeriesReducerMin SeriesReducerApproach = "min"
SeriesReducerMax SeriesReducerApproach = "max"
SeriesReducerStdDev SeriesReducerApproach = "stddev"
SeriesReducerLast SeriesReducerApproach = "last"

SeriesReducerAverage SeriesReducerApproach = "average" // alias for "avg"
SeriesReducerTotal SeriesReducerApproach = "total" // alias for "sum"
SeriesReducerCurrent SeriesReducerApproach = "current" // alias for "last"
)

// SeriesReducer reduces a series to a single value.
Expand All @@ -55,10 +59,13 @@ func (sa SeriesReducerApproach) Reducer() SeriesReducer {
}

var seriesReducers = map[SeriesReducerApproach]SeriesReducer{
SeriesReducerAvg: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerSum: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerMin: func(b *Series) float64 { return b.SafeMin() },
SeriesReducerMax: func(b *Series) float64 { return b.SafeMax() },
SeriesReducerStdDev: func(b *Series) float64 { return b.SafeStdDev() },
SeriesReducerLast: func(b *Series) float64 { return b.SafeLastValue() },
SeriesReducerAvg: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerAverage: func(b *Series) float64 { return b.SafeAvg() },
SeriesReducerTotal: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerSum: func(b *Series) float64 { return b.SafeSum() },
SeriesReducerMin: func(b *Series) float64 { return b.SafeMin() },
SeriesReducerMax: func(b *Series) float64 { return b.SafeMax() },
SeriesReducerStdDev: func(b *Series) float64 { return b.SafeStdDev() },
SeriesReducerLast: func(b *Series) float64 { return b.SafeLastValue() },
SeriesReducerCurrent: func(b *Series) float64 { return b.SafeLastValue() },
}

0 comments on commit 4d81e4a

Please sign in to comment.