diff --git a/src/query/functions/temporal/aggregation.go b/src/query/functions/temporal/aggregation.go index 6e5a8ce66b..4c2b2b7b8f 100644 --- a/src/query/functions/temporal/aggregation.go +++ b/src/query/functions/temporal/aggregation.go @@ -73,7 +73,7 @@ func NewAggOp(args []interface{}, optype string) (transform.Params, error) { return nil, fmt.Errorf("unknown aggregation type: %s", optype) } -func newAggNode(op baseOp, controller *transform.Controller) Processor { +func newAggNode(op baseOp, controller *transform.Controller, _ transform.Options) Processor { return &aggNode{ op: op, controller: controller, diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index 95c68ddbeb..cb75fe93c7 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -81,7 +81,7 @@ func (o baseOp) Node(controller *transform.Controller, opts transform.Options) t controller: controller, cache: newBlockCache(o, opts), op: o, - processor: o.processorFn(o, controller), + processor: o.processorFn(o, controller, opts), transformOpts: opts, } } @@ -335,7 +335,7 @@ type Processor interface { } // MakeProcessor is a way to create a transform -type MakeProcessor func(op baseOp, controller *transform.Controller) Processor +type MakeProcessor func(op baseOp, controller *transform.Controller, opts transform.Options) Processor type processRequest struct { blk block.Block diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index bce1c658c2..3d0a25de6d 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -46,7 +46,7 @@ func (p *processor) Process(f []float64) float64 { return sum } -func dummyProcessor(_ baseOp, _ *transform.Controller) Processor { +func dummyProcessor(_ baseOp, _ *transform.Controller, _ transform.Options) Processor { return &processor{} } diff --git a/src/query/functions/temporal/rate.go b/src/query/functions/temporal/rate.go new file mode 100644 index 0000000000..61f2426089 --- /dev/null +++ b/src/query/functions/temporal/rate.go @@ -0,0 +1,116 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import ( + "fmt" + "math" + "time" + + "github.com/m3db/m3/src/query/executor/transform" +) + +const ( + // IRateType calculates the per-second rate of increase of the time series + // across the specified time range. This is based on the last two data points. + IRateType = "irate" + + // IDeltaType calculates the difference between the last two values in the time series. + // IDeltaTemporalType should only be used with gauges. + IDeltaType = "idelta" +) + +// NewRateOp creates a new base temporal transform for rate functions +func NewRateOp(args []interface{}, optype string) (transform.Params, error) { + if optype == IRateType || optype == IDeltaType { + return newBaseOp(args, optype, newRateNode, nil) + } + + return nil, fmt.Errorf("unknown rate type: %s", optype) +} + +func newRateNode(op baseOp, controller *transform.Controller, opts transform.Options) Processor { + isRate := op.operatorType == IRateType + + return &rateNode{ + op: op, + controller: controller, + timeSpec: opts.TimeSpec, + isRate: isRate, + } +} + +type rateNode struct { + op baseOp + controller *transform.Controller + timeSpec transform.TimeSpec + isRate bool +} + +func (r *rateNode) Process(values []float64) float64 { + valuesLen := len(values) + if valuesLen < 2 { + return math.NaN() + } + + nonNanIdx := valuesLen - 1 + // find idx for last non-NaN value + indexLast := findNonNanIdx(values, nonNanIdx) + // if indexLast is 0 then you only have one value and should return a NaN + if indexLast < 1 { + return math.NaN() + } + + nonNanIdx = findNonNanIdx(values, indexLast-1) + if nonNanIdx == -1 { + return math.NaN() + } + + previousSample := values[nonNanIdx] + lastSample := values[indexLast] + + var resultValue float64 + if r.isRate && lastSample < previousSample { + // Counter reset. + resultValue = lastSample + } else { + resultValue = lastSample - previousSample + } + + if r.isRate { + resultValue *= float64(time.Second) + resultValue /= float64(r.timeSpec.Step) * float64(indexLast-nonNanIdx) + } + + return resultValue +} + +// findNonNanIdx iterates over the values backwards until we find a non-NaN value, +// then returns its index +func findNonNanIdx(vals []float64, startingIdx int) int { + for i := startingIdx; i >= 0; i-- { + if !math.IsNaN(vals[i]) { + return i + } + } + + return -1 +} diff --git a/src/query/functions/temporal/rate_test.go b/src/query/functions/temporal/rate_test.go new file mode 100644 index 0000000000..0d84a4ab6e --- /dev/null +++ b/src/query/functions/temporal/rate_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import ( + "math" + "testing" + "time" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testRateCase struct { + name string + vals [][]float64 + opType string + afterBlockOne [][]float64 + afterAllBlocks [][]float64 +} + +var testRateCases = []testRateCase{ + { + name: "irate", + opType: IRateType, + vals: [][]float64{ + {678758, 680986, 683214, 685442, 687670}, + {1987036, 1988988, 1990940, 1992892, 1994844}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 37.1333}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333}, + }, + afterAllBlocks: [][]float64{ + {11312.6333, 37.1333, 37.1333, 37.1333, 37.1333}, + {33117.2666, 32.5333, 32.5333, 32.5333, 32.5333}, + }, + }, + { + name: "irate with some NaNs", + opType: IRateType, + vals: [][]float64{ + {1987036, 1988988, 1990940, math.NaN(), 1994844}, + {1987036, 1988988, 1990940, math.NaN(), math.NaN()}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333}, + }, + afterAllBlocks: [][]float64{ + {33117.2666, 32.5333, 32.5333, 32.5333, 32.5333}, + {11039.0888, 32.5333, 32.5333, 32.5333, 32.5333}, + }, + }, + { + name: "irate with all NaNs", + opType: IRateType, + vals: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + afterAllBlocks: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + }, +} + +func TestIRate(t *testing.T) { + testRate(t, testRateCases) +} + +var testDeltaCases = []testRateCase{ + { + name: "idelta", + opType: IDeltaType, + vals: [][]float64{ + {863682, 865910, 868138, 870366, 872594}, + {1987036, 1988988, 1990940, 1992892, 1994844}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 2228}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1952}, + }, + afterAllBlocks: [][]float64{ + {-8912, 2228, 2228, 2228, 2228}, + {-7808, 1952, 1952, 1952, 1952}, + }, + }, + { + name: "idelta with some NaNs", + opType: IDeltaType, + vals: [][]float64{ + {1987036, 1988988, 1990940, math.NaN(), 1994844}, + {1987036, 1988988, 1990940, math.NaN(), math.NaN()}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 3904}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1952}, + }, + afterAllBlocks: [][]float64{ + {-7808, 1952, 1952, 1952, 3904}, + {-3904, 1952, 1952, 1952, 1952}, + }, + }, + { + name: "idelta with all NaNs", + opType: IDeltaType, + vals: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + afterAllBlocks: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()}, + }, + }, +} + +func TestIDelta(t *testing.T) { + testRate(t, testDeltaCases) +} + +// B1 has NaN in first series, first position +func testRate(t *testing.T, testCases []testRateCase) { + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + values, bounds := test.GenerateValuesAndBounds(tt.vals, nil) + boundStart := bounds.Start + block3 := test.NewBlockFromValues(bounds, values) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + + baseOp, err := NewRateOp([]interface{}{5 * time.Minute}, tt.opType) + require.NoError(t, err) + node := baseOp.Node(c, transform.Options{ + TimeSpec: transform.TimeSpec{ + Start: boundStart.Add(-2 * bounds.Duration), + End: bounds.End(), + Step: time.Minute, + }, + }) + bNode := node.(*baseNode) + err = node.Process(parser.NodeID(0), block3) + require.NoError(t, err) + assert.Len(t, sink.Values, 0, "nothing processed yet") + b, exists := bNode.cache.get(boundStart) + assert.True(t, exists, "block cached for future") + _, err = b.StepIter() + assert.NoError(t, err) + + original := values[0][0] + values[0][0] = math.NaN() + block1 := test.NewBlockFromValues(block.Bounds{ + Start: bounds.Start.Add(-2 * bounds.Duration), + Duration: bounds.Duration, + StepSize: bounds.StepSize, + }, values) + + values[0][0] = original + err = node.Process(parser.NodeID(0), block1) + require.NoError(t, err) + assert.Len(t, sink.Values, 2, "output from first block only") + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001) + _, exists = bNode.cache.get(boundStart) + assert.True(t, exists, "block still cached") + _, exists = bNode.cache.get(boundStart.Add(-1 * bounds.Duration)) + assert.False(t, exists, "block cached") + + block2 := test.NewBlockFromValues(block.Bounds{ + Start: bounds.Start.Add(-1 * bounds.Duration), + Duration: bounds.Duration, + StepSize: bounds.StepSize, + }, values) + + err = node.Process(parser.NodeID(0), block2) + require.NoError(t, err) + assert.Len(t, sink.Values, 6, "output from all 3 blocks") + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001) + expectedOne := tt.afterAllBlocks[0] + expectedTwo := tt.afterAllBlocks[1] + test.EqualsWithNansWithDelta(t, expectedOne, sink.Values[2], 0.0001) + test.EqualsWithNansWithDelta(t, expectedTwo, sink.Values[3], 0.0001) + _, exists = bNode.cache.get(bounds.Previous(2).Start) + assert.False(t, exists, "block removed from cache") + _, exists = bNode.cache.get(bounds.Previous(1).Start) + assert.False(t, exists, "block not cached") + _, exists = bNode.cache.get(bounds.Start) + assert.False(t, exists, "block removed from cache") + blks, err := bNode.cache.multiGet(bounds.Previous(2), 3, false) + require.NoError(t, err) + assert.Len(t, blks, 0) + }) + } +} + +func TestUnknownRate(t *testing.T) { + _, err := NewRateOp([]interface{}{5 * time.Minute}, "unknown_rate_func") + require.Error(t, err) +} diff --git a/src/query/parser/promql/parse_test.go b/src/query/parser/promql/parse_test.go index d82de92a02..3b41f15612 100644 --- a/src/query/parser/promql/parse_test.go +++ b/src/query/parser/promql/parse_test.go @@ -247,6 +247,8 @@ var temporalParseTests = []struct { {"sum_over_time(up[5m])", temporal.SumTemporalType}, {"stddev_over_time(up[5m])", temporal.StdDevTemporalType}, {"stdvar_over_time(up[5m])", temporal.StdVarTemporalType}, + {"irate(up[5m])", temporal.IRateType}, + {"idelta(up[5m])", temporal.IDeltaType}, } func TestTemporalParses(t *testing.T) { diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index af6123d312..8d208dc474 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -172,6 +172,9 @@ func NewFunctionExpr(name string, argValues []interface{}) (parser.Params, error temporal.StdVarTemporalType: return temporal.NewAggOp(argValues, name) + case temporal.IRateType, temporal.IDeltaType: + return temporal.NewRateOp(argValues, name) + default: // TODO: handle other types return nil, fmt.Errorf("function not supported: %s", name)