diff --git a/scripts/comparator/basic_queries/queries.json b/scripts/comparator/basic_queries/queries.json index 8bfcbea251..beb3d78215 100644 --- a/scripts/comparator/basic_queries/queries.json +++ b/scripts/comparator/basic_queries/queries.json @@ -24,16 +24,17 @@ { "queryGroup":"temporal", "queries":[ - "rate(quail[1m])", - "irate(quail[5m])", - "delta(quail[123s])", - "idelta(quail[1m] offset 5m)", - "deriv(quail[5m])" + "rate(multi_1[1m])", + "irate(multi_1[5m])", + "delta(multi_1[123s])", + "idelta(multi_1[1m] offset 1h)", + "deriv(multi_1[3m])" ], "steps" : [ "15s", "30s", - "1m" + "1m", + "5m" ] }, { diff --git a/src/query/plan/physical.go b/src/query/plan/physical.go index 45d55c4224..9a2cceb305 100644 --- a/src/query/plan/physical.go +++ b/src/query/plan/physical.go @@ -87,8 +87,7 @@ func NewPhysicalPlan( func (p PhysicalPlan) shiftTime() PhysicalPlan { var maxRange time.Duration - // Start offset with lookback - maxOffset := p.LookbackDuration + for _, transformID := range p.pipeline { node := p.steps[transformID] boundOp, ok := node.Transform.Op.(transform.BoundOp) @@ -97,25 +96,28 @@ func (p PhysicalPlan) shiftTime() PhysicalPlan { } spec := boundOp.Bounds() - if p.LookbackDuration > maxOffset { - maxOffset = p.LookbackDuration - } if spec.Range > maxRange { maxRange = spec.Range } } - startShift := maxOffset + maxRange - shift := startShift % p.TimeSpec.Step - extraStep := p.TimeSpec.Step - if shift == 0 { - // NB: if the start is divisible by offset, no need to take an extra step. - extraStep = 0 + startShift := p.LookbackDuration + if maxRange > 0 { + startShift = maxRange + } + + remainder := startShift % p.TimeSpec.Step + var extraShift time.Duration + if remainder != 0 { + // Align the shift to be divisible by step. + extraShift = p.TimeSpec.Step - remainder } - alignedShift := startShift - extraStep - shift + alignedShift := startShift + extraShift + p.TimeSpec.Start = p.TimeSpec.Start.Add(-1 * alignedShift) + return p } diff --git a/src/query/plan/physical_test.go b/src/query/plan/physical_test.go index 479fb577bc..04869d0ed1 100644 --- a/src/query/plan/physical_test.go +++ b/src/query/plan/physical_test.go @@ -21,7 +21,6 @@ package plan import ( - "fmt" "testing" "time" @@ -34,14 +33,10 @@ import ( "github.com/stretchr/testify/require" ) -var ( - defaultLookbackDuration = time.Minute -) - func testRequestParams() models.RequestParams { return models.RequestParams{ Now: time.Now(), - LookbackDuration: defaultLookbackDuration, + LookbackDuration: 5 * time.Minute, Step: time.Second, } } @@ -70,34 +65,77 @@ func TestResultNode(t *testing.T) { } func TestShiftTime(t *testing.T) { - fetchTransform := parser.NewTransformFromOperation(functions.FetchOp{}, 1) - agg, err := aggregation.NewAggregationOp(aggregation.CountType, aggregation.NodeParams{}) - require.NoError(t, err) - countTransform := parser.NewTransformFromOperation(agg, 2) - transforms := parser.Nodes{fetchTransform, countTransform} - edges := parser.Edges{ - parser.Edge{ - ParentID: fetchTransform.ID, - ChildID: countTransform.ID, + tests := []struct { + name string + fetchOp functions.FetchOp + lookbackDuration time.Duration + step time.Duration + wantShiftBy time.Duration + }{ + { + name: "shift by lookbackDuration", + fetchOp: functions.FetchOp{}, + lookbackDuration: 15 * time.Minute, + step: time.Second, + wantShiftBy: 15 * time.Minute, + }, + { + name: "shift by range", + fetchOp: functions.FetchOp{Range: time.Hour}, + lookbackDuration: 5 * time.Minute, + step: time.Second, + wantShiftBy: time.Hour, + }, + { + name: "align the lookback based shift by step", + fetchOp: functions.FetchOp{}, + lookbackDuration: 5 * time.Second, + step: 15 * time.Second, + wantShiftBy: 15 * time.Second, // lookback = 5, aligned to 1x step (15) + }, + { + name: "align the range based shift by step", + fetchOp: functions.FetchOp{Range: 16 * time.Second}, + lookbackDuration: 5 * time.Second, + step: 15 * time.Second, + wantShiftBy: 30 * time.Second, // range = 16, aligned to 2x step (2 * 15) + }, + { + name: "keep the same shift if already aligned by step", + fetchOp: functions.FetchOp{Range: 30 * time.Second}, + lookbackDuration: 5 * time.Second, + step: 15 * time.Second, + wantShiftBy: 30 * time.Second, // range = 30, divisible by step }, } - lp, _ := NewLogicalPlan(transforms, edges) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { - params := testRequestParams() - params.Start = params.Now.Add(-1 * time.Hour) + fetchTransform := parser.NewTransformFromOperation(tt.fetchOp, 1) + agg, err := aggregation.NewAggregationOp(aggregation.CountType, aggregation.NodeParams{}) + require.NoError(t, err) - p, err := NewPhysicalPlan(lp, params) - require.NoError(t, err) - assert.Equal(t, params.Start.Add(-1*params.LookbackDuration), - p.TimeSpec.Start, fmt.Sprintf("start is not now - lookback")) - fetchTransform = parser.NewTransformFromOperation( - functions.FetchOp{Offset: time.Minute, Range: time.Hour}, 1) - transforms = parser.Nodes{fetchTransform, countTransform} - lp, _ = NewLogicalPlan(transforms, edges) - p, err = NewPhysicalPlan(lp, params) - require.NoError(t, err) - assert.Equal(t, params.Start. - Add(-1*(time.Hour+defaultLookbackDuration)), p.TimeSpec.Start, - "start time offset by fetch") + countTransform := parser.NewTransformFromOperation(agg, 2) + transforms := parser.Nodes{fetchTransform, countTransform} + edges := parser.Edges{ + parser.Edge{ + ParentID: fetchTransform.ID, + ChildID: countTransform.ID, + }, + } + + lp, _ := NewLogicalPlan(transforms, edges) + + params := models.RequestParams{ + Now: time.Now(), + LookbackDuration: tt.lookbackDuration, + Step: tt.step, + } + + p, err := NewPhysicalPlan(lp, params) + require.NoError(t, err) + assert.Equal(t, tt.wantShiftBy.String(), params.Start.Sub(p.TimeSpec.Start).String(), "start time shifted by") + }) + } }