Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add quantile_over_time #1367

Merged
merged 7 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package temporal
import (
"fmt"
"math"
"sort"
"time"

"github.com/m3db/m3/src/query/executor/transform"
Expand Down Expand Up @@ -50,6 +51,9 @@ const (

// StdVarType calculates the standard variance of all values in the specified interval.
StdVarType = "stdvar_over_time"

// QuantileType calculates the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
QuantileType = "quantile_over_time"
)

type aggFunc func([]float64) float64
Expand Down Expand Up @@ -78,14 +82,52 @@ func (a aggProcessor) Init(op baseOp, controller *transform.Controller, opts tra
}
}

// NewQuantileOp create a new base temporal transform for quantile_over_time func.
func NewQuantileOp(args []interface{}, optype string) (transform.Params, error) {
if optype != QuantileType {
return nil, fmt.Errorf("unknown aggregation type: %s", optype)
}

if len(args) != 2 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", QuantileType, len(args))
}

q, ok := args[0].(float64)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to quantile argument: %v for %s", args[0], QuantileType)
}

duration, ok := args[1].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[1], QuantileType)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we push all of these parsing functions up into promql/types.go, similar to how this does it?

A small gotcha is that any numeric expression is valid in Prom, i.e. quantile_over_time( (1*2)/3, up[5m] ) is a valid query but would fail here and in many of the other temporal functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really don't wanna change this anymore. Also this query works in ours:

quantile_over_time((1*2)/3, service_writeTaggedBatchRaw_latency{instance=~"$instance",quantile="0.99"}[1m])               

}

aggregationFunc := makeQuantileOverTimeFn(q)

a := aggProcessor{
aggFunc: aggregationFunc,
}

return newBaseOp(duration, QuantileType, a)
}

// NewAggOp creates a new base temporal transform with a specified node.
func NewAggOp(args []interface{}, optype string) (transform.Params, error) {
if aggregationFunc, ok := aggFuncs[optype]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: flip the conditional, since the if is a lot larger now.

if len(args) != 1 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", optype, len(args))
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype)
}

a := aggProcessor{
aggFunc: aggregationFunc,
}

return newBaseOp(args, optype, a)
return newBaseOp(duration, optype, a)
}

return nil, fmt.Errorf("unknown aggregation type: %s", optype)
Expand Down Expand Up @@ -192,3 +234,53 @@ func sumAndCount(values []float64) (float64, float64) {

return sum, count
}

func removeNaNs(vals []float64) []float64 {
b := vals[:0]
for _, val := range vals {
if !math.IsNaN(val) {
b = append(b, val)
}
}

return b
}

func makeQuantileOverTimeFn(q float64) aggFunc {
return func(values []float64) float64 {
return quantile(q, removeNaNs(values))
}
}

// qauntile calculates the given quantile of a slice of values.
//
// This slice will be sorted.
// If 'values' has zero elements, NaN is returned.
// If q<0, -Inf is returned.
// If q>1, +Inf is returned.
func quantile(q float64, values []float64) float64 {
if len(values) == 0 {
return math.NaN()
}

if q < 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newlines

return math.Inf(-1)
}

if q > 1 {
return math.Inf(+1)
}

sort.Float64s(values)

n := float64(len(values))
// When the quantile lies between two values,
// we use a weighted average of the two values.
rank := q * (n - 1)

lowerIndex := math.Max(0, math.Floor(rank))
upperIndex := math.Min(n-1, lowerIndex+1)

weight := rank - math.Floor(rank)
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight
}
42 changes: 40 additions & 2 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ var testCases = []testCase{
{2, 2, 2, 2, 2},
},
},
{
name: "quantile_over_time",
opType: QuantileType,
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1.6},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 5.8},
},
afterAllBlocks: [][]float64{
{0.8, 0.8, 0.8, 0.8, 0.8},
{5.8, 5.8, 5.8, 5.8, 5.8},
},
},
}

func TestAggregation(t *testing.T) {
Expand Down Expand Up @@ -223,6 +235,18 @@ var testCasesNaNs = []testCase{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
},
{
name: "quantile_over_time",
opType: QuantileType,
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some tests that don't include NaN's? Might be a useful simple case to cover.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should add tests for all of the temporal functions with some NaNs - ill do this in a separate PR.

{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
afterAllBlocks: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
},
}

func TestAggregationAllNaNs(t *testing.T) {
Expand All @@ -242,8 +266,22 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) {
block3 := test.NewUnconsolidatedBlockFromDatapoints(bounds, values)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))

baseOp, err := NewAggOp([]interface{}{5 * time.Minute}, tt.opType)
require.NoError(t, err)
var (
args []interface{}
baseOp transform.Params
err error
)

if tt.opType == QuantileType {
args = []interface{}{0.2, 5 * time.Minute}
baseOp, err = NewQuantileOp(args, tt.opType)
require.NoError(t, err)
} else {
args = []interface{}{5 * time.Minute}
baseOp, err = NewAggOp(args, tt.opType)
require.NoError(t, err)
}

node := baseOp.Node(c, transform.Options{
TimeSpec: transform.TimeSpec{
Start: boundStart.Add(-2 * bounds.Duration),
Expand Down
13 changes: 1 addition & 12 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,7 @@ type baseOp struct {

// skipping lint check for a single operator type since we will be adding more
// nolint : unparam
func newBaseOp(args []interface{}, operatorType string, processorFn MakeProcessor) (baseOp, error) {
if operatorType != HoltWintersType && operatorType != PredictLinearType {
if len(args) != 1 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", operatorType, len(args))
}
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], operatorType)
}

func newBaseOp(duration time.Duration, operatorType string, processorFn MakeProcessor) (baseOp, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return baseOp{
operatorType: operatorType,
processorFn: processorFn,
Expand Down
7 changes: 6 additions & 1 deletion src/query/functions/temporal/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ func NewFunctionOp(args []interface{}, optype string) (transform.Params, error)
return nil, fmt.Errorf("unknown function type: %s", optype)
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype)
}

f := functionProcessor{
compFunc: compFunc,
}

return newBaseOp(args, optype, f)
return newBaseOp(duration, optype, f)
}

type functionNode struct {
Expand Down
8 changes: 7 additions & 1 deletion src/query/functions/temporal/holt_winters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package temporal
import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/executor/transform"
)
Expand All @@ -41,6 +42,11 @@ func NewHoltWintersOp(args []interface{}) (transform.Params, error) {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", HoltWintersType, len(args))
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], HoltWintersType)
}

sf, ok := args[1].(float64)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[1], HoltWintersType)
Expand All @@ -65,7 +71,7 @@ func NewHoltWintersOp(args []interface{}) (transform.Params, error) {
aggFunc: aggregationFunc,
}

return newBaseOp(args, HoltWintersType, a)
return newBaseOp(duration, HoltWintersType, a)
}

func makeHoltWintersFn(sf, tf float64) aggFunc {
Expand Down
11 changes: 10 additions & 1 deletion src/query/functions/temporal/linear_regression.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func NewLinearRegressionOp(args []interface{}, optype string) (transform.Params,
}

case DerivType:
if len(args) != 1 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", DerivType, len(args))
}

fn = func(slope, _ float64) float64 {
return slope
}
Expand All @@ -89,12 +93,17 @@ func NewLinearRegressionOp(args []interface{}, optype string) (transform.Params,
return nil, fmt.Errorf("unknown linear regression type: %s", optype)
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype)
}

l := linearRegressionProcessor{
fn: fn,
isDeriv: isDeriv,
}

return newBaseOp(args, optype, l)
return newBaseOp(duration, optype, l)
}

type linearRegressionNode struct {
Expand Down
11 changes: 10 additions & 1 deletion src/query/functions/temporal/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ func (r rateProcessor) Init(op baseOp, controller *transform.Controller, opts tr

// NewRateOp creates a new base temporal transform for rate functions
func NewRateOp(args []interface{}, optype string) (transform.Params, error) {
if len(args) != 1 {
return emptyOp, fmt.Errorf("invalid number of args for %s: %d", optype, len(args))
}

duration, ok := args[0].(time.Duration)
if !ok {
return emptyOp, fmt.Errorf("unable to cast to scalar argument: %v for %s", args[0], optype)
}

var (
isRate, isCounter bool
rateFn = standardRateFunc
Expand Down Expand Up @@ -93,7 +102,7 @@ func NewRateOp(args []interface{}, optype string) (transform.Params, error) {
rateFn: rateFn,
}

return newBaseOp(args, optype, r)
return newBaseOp(duration, optype, r)
}

type rateFn func(ts.Datapoints, bool, bool, transform.TimeSpec, time.Duration) float64
Expand Down
1 change: 1 addition & 0 deletions src/query/parser/promql/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ var temporalParseTests = []struct {
{"sum_over_time(up[5m])", temporal.SumType},
{"stddev_over_time(up[5m])", temporal.StdDevType},
{"stdvar_over_time(up[5m])", temporal.StdVarType},
{"quantile_over_time(0.2, up[5m])", temporal.QuantileType},
{"irate(up[5m])", temporal.IRateType},
{"idelta(up[5m])", temporal.IDeltaType},
{"rate(up[5m])", temporal.RateType},
Expand Down
4 changes: 4 additions & 0 deletions src/query/parser/promql/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ func NewFunctionExpr(
p, err = temporal.NewAggOp(argValues, name)
return p, true, err

case temporal.QuantileType:
p, err = temporal.NewQuantileOp(argValues, name)
return p, true, err

case temporal.HoltWintersType:
p, err = temporal.NewHoltWintersOp(argValues)
return p, true, err
Expand Down