Skip to content

Commit

Permalink
[query] Fix PhysicalPlan.shiftTime (#2413)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Jun 17, 2020
1 parent b12af4d commit 9d98e79
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 49 deletions.
13 changes: 7 additions & 6 deletions scripts/comparator/basic_queries/queries.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
{
"queryGroup":"temporal",
"queries":[
"rate(quail[1m])",
"irate(quail[5m])",
"delta(quail[123s])",
"idelta(quail[1m] offset 5m)",
"deriv(quail[5m])"
"rate(multi_1[1m])",
"irate(multi_1[5m])",
"delta(multi_1[123s])",
"idelta(multi_1[1m] offset 1h)",
"deriv(multi_1[3m])"
],
"steps" : [
"15s",
"30s",
"1m"
"1m",
"5m"
]
},
{
Expand Down
26 changes: 14 additions & 12 deletions src/query/plan/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func NewPhysicalPlan(

func (p PhysicalPlan) shiftTime() PhysicalPlan {
var maxRange time.Duration
// Start offset with lookback
maxOffset := p.LookbackDuration

for _, transformID := range p.pipeline {
node := p.steps[transformID]
boundOp, ok := node.Transform.Op.(transform.BoundOp)
Expand All @@ -97,25 +96,28 @@ func (p PhysicalPlan) shiftTime() PhysicalPlan {
}

spec := boundOp.Bounds()
if p.LookbackDuration > maxOffset {
maxOffset = p.LookbackDuration
}

if spec.Range > maxRange {
maxRange = spec.Range
}
}

startShift := maxOffset + maxRange
shift := startShift % p.TimeSpec.Step
extraStep := p.TimeSpec.Step
if shift == 0 {
// NB: if the start is divisible by offset, no need to take an extra step.
extraStep = 0
startShift := p.LookbackDuration
if maxRange > 0 {
startShift = maxRange
}

remainder := startShift % p.TimeSpec.Step
var extraShift time.Duration
if remainder != 0 {
// Align the shift to be divisible by step.
extraShift = p.TimeSpec.Step - remainder
}

alignedShift := startShift - extraStep - shift
alignedShift := startShift + extraShift

p.TimeSpec.Start = p.TimeSpec.Start.Add(-1 * alignedShift)

return p
}

Expand Down
100 changes: 69 additions & 31 deletions src/query/plan/physical_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package plan

import (
"fmt"
"testing"
"time"

Expand All @@ -34,14 +33,10 @@ import (
"github.com/stretchr/testify/require"
)

var (
defaultLookbackDuration = time.Minute
)

func testRequestParams() models.RequestParams {
return models.RequestParams{
Now: time.Now(),
LookbackDuration: defaultLookbackDuration,
LookbackDuration: 5 * time.Minute,
Step: time.Second,
}
}
Expand Down Expand Up @@ -70,34 +65,77 @@ func TestResultNode(t *testing.T) {
}

func TestShiftTime(t *testing.T) {
fetchTransform := parser.NewTransformFromOperation(functions.FetchOp{}, 1)
agg, err := aggregation.NewAggregationOp(aggregation.CountType, aggregation.NodeParams{})
require.NoError(t, err)
countTransform := parser.NewTransformFromOperation(agg, 2)
transforms := parser.Nodes{fetchTransform, countTransform}
edges := parser.Edges{
parser.Edge{
ParentID: fetchTransform.ID,
ChildID: countTransform.ID,
tests := []struct {
name string
fetchOp functions.FetchOp
lookbackDuration time.Duration
step time.Duration
wantShiftBy time.Duration
}{
{
name: "shift by lookbackDuration",
fetchOp: functions.FetchOp{},
lookbackDuration: 15 * time.Minute,
step: time.Second,
wantShiftBy: 15 * time.Minute,
},
{
name: "shift by range",
fetchOp: functions.FetchOp{Range: time.Hour},
lookbackDuration: 5 * time.Minute,
step: time.Second,
wantShiftBy: time.Hour,
},
{
name: "align the lookback based shift by step",
fetchOp: functions.FetchOp{},
lookbackDuration: 5 * time.Second,
step: 15 * time.Second,
wantShiftBy: 15 * time.Second, // lookback = 5, aligned to 1x step (15)
},
{
name: "align the range based shift by step",
fetchOp: functions.FetchOp{Range: 16 * time.Second},
lookbackDuration: 5 * time.Second,
step: 15 * time.Second,
wantShiftBy: 30 * time.Second, // range = 16, aligned to 2x step (2 * 15)
},
{
name: "keep the same shift if already aligned by step",
fetchOp: functions.FetchOp{Range: 30 * time.Second},
lookbackDuration: 5 * time.Second,
step: 15 * time.Second,
wantShiftBy: 30 * time.Second, // range = 30, divisible by step
},
}

lp, _ := NewLogicalPlan(transforms, edges)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

params := testRequestParams()
params.Start = params.Now.Add(-1 * time.Hour)
fetchTransform := parser.NewTransformFromOperation(tt.fetchOp, 1)
agg, err := aggregation.NewAggregationOp(aggregation.CountType, aggregation.NodeParams{})
require.NoError(t, err)

p, err := NewPhysicalPlan(lp, params)
require.NoError(t, err)
assert.Equal(t, params.Start.Add(-1*params.LookbackDuration),
p.TimeSpec.Start, fmt.Sprintf("start is not now - lookback"))
fetchTransform = parser.NewTransformFromOperation(
functions.FetchOp{Offset: time.Minute, Range: time.Hour}, 1)
transforms = parser.Nodes{fetchTransform, countTransform}
lp, _ = NewLogicalPlan(transforms, edges)
p, err = NewPhysicalPlan(lp, params)
require.NoError(t, err)
assert.Equal(t, params.Start.
Add(-1*(time.Hour+defaultLookbackDuration)), p.TimeSpec.Start,
"start time offset by fetch")
countTransform := parser.NewTransformFromOperation(agg, 2)
transforms := parser.Nodes{fetchTransform, countTransform}
edges := parser.Edges{
parser.Edge{
ParentID: fetchTransform.ID,
ChildID: countTransform.ID,
},
}

lp, _ := NewLogicalPlan(transforms, edges)

params := models.RequestParams{
Now: time.Now(),
LookbackDuration: tt.lookbackDuration,
Step: tt.step,
}

p, err := NewPhysicalPlan(lp, params)
require.NoError(t, err)
assert.Equal(t, tt.wantShiftBy.String(), params.Start.Sub(p.TimeSpec.Start).String(), "start time shifted by")
})
}
}

0 comments on commit 9d98e79

Please sign in to comment.