diff --git a/src/query/functions/temporal/aggregation.go b/src/query/functions/temporal/aggregation.go index 6e5a8ce66b..4c2b2b7b8f 100644 --- a/src/query/functions/temporal/aggregation.go +++ b/src/query/functions/temporal/aggregation.go @@ -73,7 +73,7 @@ func NewAggOp(args []interface{}, optype string) (transform.Params, error) { return nil, fmt.Errorf("unknown aggregation type: %s", optype) } -func newAggNode(op baseOp, controller *transform.Controller) Processor { +func newAggNode(op baseOp, controller *transform.Controller, _ transform.Options) Processor { return &aggNode{ op: op, controller: controller, diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index 95c68ddbeb..cb75fe93c7 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -81,7 +81,7 @@ func (o baseOp) Node(controller *transform.Controller, opts transform.Options) t controller: controller, cache: newBlockCache(o, opts), op: o, - processor: o.processorFn(o, controller), + processor: o.processorFn(o, controller, opts), transformOpts: opts, } } @@ -335,7 +335,7 @@ type Processor interface { } // MakeProcessor is a way to create a transform -type MakeProcessor func(op baseOp, controller *transform.Controller) Processor +type MakeProcessor func(op baseOp, controller *transform.Controller, opts transform.Options) Processor type processRequest struct { blk block.Block diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index bce1c658c2..3d0a25de6d 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -46,7 +46,7 @@ func (p *processor) Process(f []float64) float64 { return sum } -func dummyProcessor(_ baseOp, _ *transform.Controller) Processor { +func dummyProcessor(_ baseOp, _ *transform.Controller, _ transform.Options) Processor { return &processor{} } diff --git a/src/query/functions/temporal/rate.go b/src/query/functions/temporal/rate.go new file mode 100644 index 0000000000..b41231918f --- /dev/null +++ b/src/query/functions/temporal/rate.go @@ -0,0 +1,101 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import ( + "fmt" + "math" + "time" + + "github.com/m3db/m3/src/query/executor/transform" +) + +const ( + // IRateTemporalType calculates the per-second instant rate of increase of the time series + // in the range vector. This is based on the last two data points. + IRateTemporalType = "irate" +) + +// NewRateOp creates a new base temporal transform for rate functions +func NewRateOp(args []interface{}, optype string) (transform.Params, error) { + if optype == IRateTemporalType { + return newBaseOp(args, optype, newRateNode, nil) + } + + return nil, fmt.Errorf("unknown aggregation type: %s", optype) +} + +func newRateNode(op baseOp, controller *transform.Controller, opts transform.Options) Processor { + return &rateNode{ + op: op, + controller: controller, + timeSpec: opts.TimeSpec, + } +} + +type rateNode struct { + op baseOp + controller *transform.Controller + timeSpec transform.TimeSpec +} + +func (r *rateNode) Process(values []float64) float64 { + switch r.op.operatorType { + case IRateTemporalType: + return instantValue(values, true, r.timeSpec.Step) + default: + panic("unknown aggregation type") + } +} + +func instantValue(values []float64, isRate bool, stepSize time.Duration) float64 { + fmt.Println(values) + valuesLen := len(values) + if valuesLen < 2 { + return math.NaN() + } + + // {0, 1, 2, 3, 4}, + // {5, 6, 7, 8, 9}, + + lastSample := values[valuesLen-1] + previousSample := values[valuesLen-2] + + fmt.Println(lastSample, previousSample) + + var resultValue float64 + if isRate && lastSample < previousSample { + // Counter reset. + resultValue = lastSample + } else { + resultValue = lastSample - previousSample + } + fmt.Println("result: ", resultValue) + + if isRate { + // Convert to per-second. + fmt.Println(stepSize, float64(stepSize), float64(stepSize)/1000, "sec: ", stepSize.Seconds()) + // resultValue /= float64(stepSize) / 1000 + resultValue /= float64(stepSize.Nanoseconds()) / 1000 + } + + return resultValue +} diff --git a/src/query/functions/temporal/rate_test.go b/src/query/functions/temporal/rate_test.go new file mode 100644 index 0000000000..2fe83a86e8 --- /dev/null +++ b/src/query/functions/temporal/rate_test.go @@ -0,0 +1,137 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import ( + "math" + "testing" + "time" + + "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" + "github.com/m3db/m3/src/query/parser" + "github.com/m3db/m3/src/query/test" + "github.com/m3db/m3/src/query/test/executor" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var testRateCases = []testCase{ + { + name: "irate", + opType: IRateTemporalType, + afterBlockOne: [][]float64{ + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1952}, + {math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1}, + }, + afterAllBlocks: [][]float64{ + {2, 2, 2, 2, 2}, + {7, 7, 7, 7, 7}, + }, + }, +} + +func TestRate(t *testing.T) { + v := [][]float64{ + {1987036, 1988988, 1990940, 1992892, 1994844}, + {5, 6, 7, 8, 9}, + } + testRate(t, testRateCases, v) +} + +// B1 has NaN in first series, first position +func testRate(t *testing.T, testCases []testCase, vals [][]float64) { + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + values, bounds := test.GenerateValuesAndBounds(vals, nil) + boundStart := bounds.Start + block3 := test.NewBlockFromValues(bounds, values) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + + baseOp, err := NewRateOp([]interface{}{5 * time.Minute}, tt.opType) + require.NoError(t, err) + node := baseOp.Node(c, transform.Options{ + TimeSpec: transform.TimeSpec{ + Start: boundStart.Add(-2 * bounds.Duration), + End: bounds.End(), + Step: time.Second, + }, + }) + bNode := node.(*baseNode) + err = node.Process(parser.NodeID(0), block3) + require.NoError(t, err) + assert.Len(t, sink.Values, 0, "nothing processed yet") + b, exists := bNode.cache.get(boundStart) + assert.True(t, exists, "block cached for future") + _, err = b.StepIter() + assert.NoError(t, err) + + original := values[0][0] + values[0][0] = math.NaN() + block1 := test.NewBlockFromValues(block.Bounds{ + Start: bounds.Start.Add(-2 * bounds.Duration), + Duration: bounds.Duration, + StepSize: bounds.StepSize, + }, values) + + values[0][0] = original + err = node.Process(parser.NodeID(0), block1) + require.NoError(t, err) + assert.Len(t, sink.Values, 2, "output from first block only") + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001) + _, exists = bNode.cache.get(boundStart) + assert.True(t, exists, "block still cached") + _, exists = bNode.cache.get(boundStart.Add(-1 * bounds.Duration)) + assert.False(t, exists, "block cached") + + block2 := test.NewBlockFromValues(block.Bounds{ + Start: bounds.Start.Add(-1 * bounds.Duration), + Duration: bounds.Duration, + StepSize: bounds.StepSize, + }, values) + + err = node.Process(parser.NodeID(0), block2) + require.NoError(t, err) + assert.Len(t, sink.Values, 6, "output from all 3 blocks") + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001) + test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001) + expectedOne := tt.afterAllBlocks[0] + expectedTwo := tt.afterAllBlocks[1] + test.EqualsWithNansWithDelta(t, expectedOne, sink.Values[2], 0.0001) + test.EqualsWithNansWithDelta(t, expectedTwo, sink.Values[3], 0.0001) + _, exists = bNode.cache.get(bounds.Previous(2).Start) + assert.False(t, exists, "block removed from cache") + _, exists = bNode.cache.get(bounds.Previous(1).Start) + assert.False(t, exists, "block not cached") + _, exists = bNode.cache.get(bounds.Start) + assert.False(t, exists, "block removed from cache") + blks, err := bNode.cache.multiGet(bounds.Previous(2), 3, false) + require.NoError(t, err) + assert.Len(t, blks, 0) + }) + } +} + +// func TestUnknownAggregation(t *testing.T) { +// _, err := NewAggOp([]interface{}{5 * time.Minute}, "unknown_agg_func") +// require.Error(t, err) +// } diff --git a/src/query/parser/promql/types.go b/src/query/parser/promql/types.go index af6123d312..414a452310 100644 --- a/src/query/parser/promql/types.go +++ b/src/query/parser/promql/types.go @@ -172,6 +172,9 @@ func NewFunctionExpr(name string, argValues []interface{}) (parser.Params, error temporal.StdVarTemporalType: return temporal.NewAggOp(argValues, name) + case temporal.IRateTemporalType: + return temporal.NewRateOp(argValues, name) + default: // TODO: handle other types return nil, fmt.Errorf("function not supported: %s", name)