Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add irate #897

Merged
merged 10 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewAggOp(args []interface{}, optype string) (transform.Params, error) {
return nil, fmt.Errorf("unknown aggregation type: %s", optype)
}

func newAggNode(op baseOp, controller *transform.Controller) Processor {
func newAggNode(op baseOp, controller *transform.Controller, _ transform.Options) Processor {
return &aggNode{
op: op,
controller: controller,
Expand Down
4 changes: 2 additions & 2 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (o baseOp) Node(controller *transform.Controller, opts transform.Options) t
controller: controller,
cache: newBlockCache(o, opts),
op: o,
processor: o.processorFn(o, controller),
processor: o.processorFn(o, controller, opts),
transformOpts: opts,
}
}
Expand Down Expand Up @@ -335,7 +335,7 @@ type Processor interface {
}

// MakeProcessor is a way to create a transform
type MakeProcessor func(op baseOp, controller *transform.Controller) Processor
type MakeProcessor func(op baseOp, controller *transform.Controller, opts transform.Options) Processor

type processRequest struct {
blk block.Block
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/temporal/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *processor) Process(f []float64) float64 {
return sum
}

func dummyProcessor(_ baseOp, _ *transform.Controller) Processor {
func dummyProcessor(_ baseOp, _ *transform.Controller, _ transform.Options) Processor {
return &processor{}
}

Expand Down
119 changes: 119 additions & 0 deletions src/query/functions/temporal/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package temporal

import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/executor/transform"
)

const (
// IRateTemporalType calculates the per-second rate of increase of the time series
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IRateType is fine since it's in temporal, otherwise you have temporal.IRateTemporal

// across the specified time range. This is based on the last two data points.
IRateTemporalType = "irate"

// IDeltaTemporalType calculates the difference between the last two values in the time series.
// IDeltaTemporalType should only be used with gauges.
IDeltaTemporalType = "idelta"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here

)

// NewRateOp creates a new base temporal transform for rate functions
func NewRateOp(args []interface{}, optype string) (transform.Params, error) {
if optype == IRateTemporalType || optype == IDeltaTemporalType {
return newBaseOp(args, optype, newRateNode, nil)
}

return nil, fmt.Errorf("unknown rate type: %s", optype)
}

func newRateNode(op baseOp, controller *transform.Controller, opts transform.Options) Processor {
var isRate bool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRate := op.operatorType == IRateTemporalType

if op.operatorType == IRateTemporalType {
isRate = true
}

return &rateNode{
op: op,
controller: controller,
timeSpec: opts.TimeSpec,
isRate: isRate,
}
}

type rateNode struct {
op baseOp
controller *transform.Controller
timeSpec transform.TimeSpec
isRate bool
}

func (r *rateNode) Process(values []float64) float64 {
valuesLen := len(values)
if valuesLen < 2 {
return math.NaN()
}

nonNanIdx := valuesLen - 1
// find idx for last non-NaN value
indexLast := findNonNanIdx(values, nonNanIdx)
// if indexLast is 0 then you only have one value and should return a NaN
if indexLast < 1 {
return math.NaN()
}

lastSample := values[indexLast]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since you keep track of this index, you can define lastSample lower down, closer to where you use it

nonNanIdx = findNonNanIdx(values, indexLast-1)
if nonNanIdx == -1 {
return math.NaN()
}

previousSample := values[nonNanIdx]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit newline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is fine :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were add newlines after each closing bracket with lines following it


var resultValue float64
if r.isRate && lastSample < previousSample {
// Counter reset.
resultValue = lastSample
} else {
resultValue = lastSample - previousSample
}

if r.isRate {
resultValue *= float64(time.Second)
resultValue /= float64(r.timeSpec.Step) * float64(indexLast-nonNanIdx)
}

return resultValue
}

// findNonNanIdx iterates over the values backwards until we find a non-NaN value,
// then returns its index
func findNonNanIdx(vals []float64, startingIdx int) int {
for i := startingIdx; i >= 0; i-- {
if !math.IsNaN(vals[i]) {
return i
}
}

return -1
}
233 changes: 233 additions & 0 deletions src/query/functions/temporal/rate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package temporal

import (
"math"
"testing"
"time"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/test"
"github.com/m3db/m3/src/query/test/executor"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testRateCase struct {
name string
vals [][]float64
opType string
afterBlockOne [][]float64
afterAllBlocks [][]float64
}

var testRateCases = []testRateCase{
{
name: "irate",
opType: IRateTemporalType,
vals: [][]float64{
{678758, 680986, 683214, 685442, 687670},
{1987036, 1988988, 1990940, 1992892, 1994844},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 37.1333},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333},
},
afterAllBlocks: [][]float64{
{11312.6333, 37.1333, 37.1333, 37.1333, 37.1333},
{33117.2666, 32.5333, 32.5333, 32.5333, 32.5333},
},
},
{
name: "irate with some NaNs",
opType: IRateTemporalType,
vals: [][]float64{
{1987036, 1988988, 1990940, math.NaN(), 1994844},
{1987036, 1988988, 1990940, math.NaN(), math.NaN()},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 32.5333},
},
afterAllBlocks: [][]float64{
{33117.2666, 32.5333, 32.5333, 32.5333, 32.5333},
{11039.0888, 32.5333, 32.5333, 32.5333, 32.5333},
},
},
{
name: "irate with all NaNs",
opType: IRateTemporalType,
vals: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
afterAllBlocks: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
},
}

func TestIRate(t *testing.T) {
testRate(t, testRateCases)
}

var testDeltaCases = []testRateCase{
{
name: "idelta",
opType: IDeltaTemporalType,
vals: [][]float64{
{863682, 865910, 868138, 870366, 872594},
{1987036, 1988988, 1990940, 1992892, 1994844},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 2228},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1952},
},
afterAllBlocks: [][]float64{
{-8912, 2228, 2228, 2228, 2228},
{-7808, 1952, 1952, 1952, 1952},
},
},
{
name: "idelta with some NaNs",
opType: IDeltaTemporalType,
vals: [][]float64{
{1987036, 1988988, 1990940, math.NaN(), 1994844},
{1987036, 1988988, 1990940, math.NaN(), math.NaN()},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 3904},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), 1952},
},
afterAllBlocks: [][]float64{
{-7808, 1952, 1952, 1952, 3904},
{-3904, 1952, 1952, 1952, 1952},
},
},
{
name: "idelta with all NaNs",
opType: IDeltaTemporalType,
vals: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
afterBlockOne: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
afterAllBlocks: [][]float64{
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
},
},
}

func TestIDelta(t *testing.T) {
testRate(t, testDeltaCases)
}

// B1 has NaN in first series, first position
func testRate(t *testing.T, testCases []testRateCase) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
values, bounds := test.GenerateValuesAndBounds(tt.vals, nil)
boundStart := bounds.Start
block3 := test.NewBlockFromValues(bounds, values)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))

baseOp, err := NewRateOp([]interface{}{5 * time.Minute}, tt.opType)
require.NoError(t, err)
node := baseOp.Node(c, transform.Options{
TimeSpec: transform.TimeSpec{
Start: boundStart.Add(-2 * bounds.Duration),
End: bounds.End(),
Step: time.Minute,
},
})
bNode := node.(*baseNode)
err = node.Process(parser.NodeID(0), block3)
require.NoError(t, err)
assert.Len(t, sink.Values, 0, "nothing processed yet")
b, exists := bNode.cache.get(boundStart)
assert.True(t, exists, "block cached for future")
_, err = b.StepIter()
assert.NoError(t, err)

original := values[0][0]
values[0][0] = math.NaN()
block1 := test.NewBlockFromValues(block.Bounds{
Start: bounds.Start.Add(-2 * bounds.Duration),
Duration: bounds.Duration,
StepSize: bounds.StepSize,
}, values)

values[0][0] = original
err = node.Process(parser.NodeID(0), block1)
require.NoError(t, err)
assert.Len(t, sink.Values, 2, "output from first block only")
test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001)
test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001)
_, exists = bNode.cache.get(boundStart)
assert.True(t, exists, "block still cached")
_, exists = bNode.cache.get(boundStart.Add(-1 * bounds.Duration))
assert.False(t, exists, "block cached")

block2 := test.NewBlockFromValues(block.Bounds{
Start: bounds.Start.Add(-1 * bounds.Duration),
Duration: bounds.Duration,
StepSize: bounds.StepSize,
}, values)

err = node.Process(parser.NodeID(0), block2)
require.NoError(t, err)
assert.Len(t, sink.Values, 6, "output from all 3 blocks")
test.EqualsWithNansWithDelta(t, tt.afterBlockOne[0], sink.Values[0], 0.0001)
test.EqualsWithNansWithDelta(t, tt.afterBlockOne[1], sink.Values[1], 0.0001)
expectedOne := tt.afterAllBlocks[0]
expectedTwo := tt.afterAllBlocks[1]
test.EqualsWithNansWithDelta(t, expectedOne, sink.Values[2], 0.0001)
test.EqualsWithNansWithDelta(t, expectedTwo, sink.Values[3], 0.0001)
_, exists = bNode.cache.get(bounds.Previous(2).Start)
assert.False(t, exists, "block removed from cache")
_, exists = bNode.cache.get(bounds.Previous(1).Start)
assert.False(t, exists, "block not cached")
_, exists = bNode.cache.get(bounds.Start)
assert.False(t, exists, "block removed from cache")
blks, err := bNode.cache.multiGet(bounds.Previous(2), 3, false)
require.NoError(t, err)
assert.Len(t, blks, 0)
})
}
}

func TestUnknownRate(t *testing.T) {
_, err := NewRateOp([]interface{}{5 * time.Minute}, "unknown_rate_func")
require.Error(t, err)
}
2 changes: 2 additions & 0 deletions src/query/parser/promql/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ var temporalParseTests = []struct {
{"sum_over_time(up[5m])", temporal.SumTemporalType},
{"stddev_over_time(up[5m])", temporal.StdDevTemporalType},
{"stdvar_over_time(up[5m])", temporal.StdVarTemporalType},
{"irate(up[5m])", temporal.IRateTemporalType},
{"idelta(up[5m])", temporal.IDeltaTemporalType},
}

func TestTemporalParses(t *testing.T) {
Expand Down
Loading