-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Add quantile_over_time #1367
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ package temporal | |
import ( | ||
"fmt" | ||
"math" | ||
"sort" | ||
"time" | ||
|
||
"github.com/m3db/m3/src/query/executor/transform" | ||
|
@@ -50,6 +51,9 @@ const ( | |
|
||
// StdVarType calculates the standard variance of all values in the specified interval. | ||
StdVarType = "stdvar_over_time" | ||
|
||
// QuantileType calculates the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval. | ||
QuantileType = "quantile_over_time" | ||
) | ||
|
||
type aggFunc func([]float64) float64 | ||
|
@@ -78,14 +82,52 @@ func (a aggProcessor) Init(op baseOp, controller *transform.Controller, opts tra | |
} | ||
} | ||
|
||
// NewQuantileOp create a new base temporal transform for quantile_over_time func. | ||
func NewQuantileOp(args []interface{}, optype string) (transform.Params, error) { | ||
if optype != QuantileType { | ||
return nil, fmt.Errorf("unknown aggregation type: %s", optype) | ||
} | ||
|
||
if len(args) != 2 { | ||
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", QuantileType, len(args)) | ||
} | ||
|
||
q, ok := args[0].(float64) | ||
if !ok { | ||
return emptyOp, fmt.Errorf("unable to cast to quantile argument: %v for %s", args[0], QuantileType) | ||
} | ||
|
||
duration, ok := args[1].(time.Duration) | ||
if !ok { | ||
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[1], QuantileType) | ||
} | ||
|
||
aggregationFunc := makeQuantileOverTimeFn(q) | ||
|
||
a := aggProcessor{ | ||
aggFunc: aggregationFunc, | ||
} | ||
|
||
return newBaseOp(duration, QuantileType, a) | ||
} | ||
|
||
// NewAggOp creates a new base temporal transform with a specified node. | ||
func NewAggOp(args []interface{}, optype string) (transform.Params, error) { | ||
if aggregationFunc, ok := aggFuncs[optype]; ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: flip the conditional, since the |
||
if len(args) != 1 { | ||
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", optype, len(args)) | ||
} | ||
|
||
duration, ok := args[0].(time.Duration) | ||
if !ok { | ||
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype) | ||
} | ||
|
||
a := aggProcessor{ | ||
aggFunc: aggregationFunc, | ||
} | ||
|
||
return newBaseOp(args, optype, a) | ||
return newBaseOp(duration, optype, a) | ||
} | ||
|
||
return nil, fmt.Errorf("unknown aggregation type: %s", optype) | ||
|
@@ -192,3 +234,53 @@ func sumAndCount(values []float64) (float64, float64) { | |
|
||
return sum, count | ||
} | ||
|
||
func removeNaNs(vals []float64) []float64 { | ||
b := vals[:0] | ||
for _, val := range vals { | ||
if !math.IsNaN(val) { | ||
b = append(b, val) | ||
} | ||
} | ||
|
||
return b | ||
} | ||
|
||
func makeQuantileOverTimeFn(q float64) aggFunc { | ||
return func(values []float64) float64 { | ||
return quantile(q, removeNaNs(values)) | ||
} | ||
} | ||
|
||
// qauntile calculates the given quantile of a slice of values. | ||
// | ||
// This slice will be sorted. | ||
// If 'values' has zero elements, NaN is returned. | ||
// If q<0, -Inf is returned. | ||
// If q>1, +Inf is returned. | ||
func quantile(q float64, values []float64) float64 { | ||
if len(values) == 0 { | ||
return math.NaN() | ||
} | ||
|
||
if q < 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: newlines |
||
return math.Inf(-1) | ||
} | ||
|
||
if q > 1 { | ||
return math.Inf(+1) | ||
} | ||
|
||
sort.Float64s(values) | ||
|
||
n := float64(len(values)) | ||
// When the quantile lies between two values, | ||
// we use a weighted average of the two values. | ||
rank := q * (n - 1) | ||
|
||
lowerIndex := math.Max(0, math.Floor(rank)) | ||
upperIndex := math.Min(n-1, lowerIndex+1) | ||
|
||
weight := rank - math.Floor(rank) | ||
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,6 +128,18 @@ var testCases = []testCase{ | |
{2, 2, 2, 2, 2}, | ||
}, | ||
}, | ||
{ | ||
name: "quantile_over_time", | ||
opType: QuantileType, | ||
afterBlockOne: [][]float64{ | ||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1.6}, | ||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 5.8}, | ||
}, | ||
afterAllBlocks: [][]float64{ | ||
{0.8, 0.8, 0.8, 0.8, 0.8}, | ||
{5.8, 5.8, 5.8, 5.8, 5.8}, | ||
}, | ||
}, | ||
} | ||
|
||
func TestAggregation(t *testing.T) { | ||
|
@@ -223,6 +235,18 @@ var testCasesNaNs = []testCase{ | |
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, | ||
}, | ||
}, | ||
{ | ||
name: "quantile_over_time", | ||
opType: QuantileType, | ||
afterBlockOne: [][]float64{ | ||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have some tests that don't include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we should add tests for all of the temporal functions with some NaNs - ill do this in a separate PR. |
||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, | ||
}, | ||
afterAllBlocks: [][]float64{ | ||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, | ||
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, | ||
}, | ||
}, | ||
} | ||
|
||
func TestAggregationAllNaNs(t *testing.T) { | ||
|
@@ -242,8 +266,22 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) { | |
block3 := test.NewUnconsolidatedBlockFromDatapoints(bounds, values) | ||
c, sink := executor.NewControllerWithSink(parser.NodeID(1)) | ||
|
||
baseOp, err := NewAggOp([]interface{}{5 * time.Minute}, tt.opType) | ||
require.NoError(t, err) | ||
var ( | ||
args []interface{} | ||
baseOp transform.Params | ||
err error | ||
) | ||
|
||
if tt.opType == QuantileType { | ||
args = []interface{}{0.2, 5 * time.Minute} | ||
baseOp, err = NewQuantileOp(args, tt.opType) | ||
require.NoError(t, err) | ||
} else { | ||
args = []interface{}{5 * time.Minute} | ||
baseOp, err = NewAggOp(args, tt.opType) | ||
require.NoError(t, err) | ||
} | ||
|
||
node := baseOp.Node(c, transform.Options{ | ||
TimeSpec: transform.TimeSpec{ | ||
Start: boundStart.Add(-2 * bounds.Duration), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,18 +48,7 @@ type baseOp struct { | |
|
||
// skipping lint check for a single operator type since we will be adding more | ||
// nolint : unparam | ||
func newBaseOp(args []interface{}, operatorType string, processorFn MakeProcessor) (baseOp, error) { | ||
if operatorType != HoltWintersType && operatorType != PredictLinearType { | ||
if len(args) != 1 { | ||
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", operatorType, len(args)) | ||
} | ||
} | ||
|
||
duration, ok := args[0].(time.Duration) | ||
if !ok { | ||
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], operatorType) | ||
} | ||
|
||
func newBaseOp(duration time.Duration, operatorType string, processorFn MakeProcessor) (baseOp, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return baseOp{ | ||
operatorType: operatorType, | ||
processorFn: processorFn, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we push all of these parsing functions up into
promql/types.go
, similar to how this does it?A small gotcha is that any numeric expression is valid in Prom, i.e.
quantile_over_time( (1*2)/3, up[5m] )
is a valid query but would fail here and in many of the other temporal functions.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really don't wanna change this anymore. Also this query works in ours: