Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement offset modifier for range vector aggregation in LogQL #3455

Merged
merged 6 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ The same rules that apply for [Prometheus Label Selectors](https://prometheus.io

**Important note:** The `=~` regex operator is fully anchored, meaning regex must match against the *entire* string, including newlines. The regex `.` character does not match newlines by default. If you want the regex dot character to match newlines you can use the single-line flag, like so: `(?s)search_term.+` matches `search_term\n`.

#### Offset modifier
The offset modifier allows changing the time offset for individual range vectors in a query.

For example, the following expression counts all the logs within the last ten minutes to five minutes rather than last five minutes for the MySQL job. Note that the `offset` modifier always needs to follow the range vector selector immediately.
```logql
count_over_time({job="mysql"}[5m] offset 5m) // GOOD
count_over_time({job="mysql"}[5m]) offset 5m // INVALID
```

### Log Pipeline

A log pipeline can be appended to a log stream selector to further process and filter log streams. It usually is composed of one or multiple expressions, each expressions is executed in sequence for each log line. If an expression filters out a log line, the pipeline will stop at this point and start processing the next line.
Expand Down
29 changes: 28 additions & 1 deletion pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func newUnwrapExpr(id string, operation string) *unwrapExpr {
type logRange struct {
left LogSelectorExpr
interval time.Duration
offset time.Duration

unwrap *unwrapExpr
}
Expand All @@ -511,16 +512,41 @@ func (r logRange) String() string {
sb.WriteString(r.unwrap.String())
}
sb.WriteString(fmt.Sprintf("[%v]", model.Duration(r.interval)))
if r.offset != 0 {
offsetExpr := offsetExpr{offset: r.offset}
sb.WriteString(offsetExpr.String())
}
return sb.String()
}

func (r *logRange) Shardable() bool { return r.left.Shardable() }

func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange {
func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr, o *offsetExpr) *logRange {
var offset time.Duration
if o != nil {
offset = o.offset
}
return &logRange{
left: left,
interval: interval,
unwrap: u,
offset: offset,
}
}

type offsetExpr struct {
offset time.Duration
}

func (o *offsetExpr) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" %s %s", OpOffset, o.offset.String()))
return sb.String()
}

func newOffsetExpr(offset time.Duration) *offsetExpr {
return &offsetExpr{
offset: offset,
}
}

Expand Down Expand Up @@ -582,6 +608,7 @@ const (

OpPipe = "|"
OpUnwrap = "unwrap"
OpOffset = "offset"

// conversion Op
OpConvBytes = "bytes"
Expand Down
24 changes: 24 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,27 @@ func Test_SampleExpr_String(t *testing.T) {
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 10m )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"}[5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | json [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 10m))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s])) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s] offset 10m)) by (region)`,
`sum by (cluster) (count_over_time({job="mysql"}[5m]))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / sum by (cluster) (count_over_time({job="postgres"}[5m])) `,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m)) / sum by (cluster) (count_over_time({job="postgres"}[5m] offset 10m)) `,
`
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
Expand All @@ -86,6 +94,8 @@ func Test_SampleExpr_String(t *testing.T) {
)`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m])`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m] offset 10m)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms|unwrap latency [5m])`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap latency[5m])
Expand Down Expand Up @@ -130,6 +140,20 @@ func Test_SampleExpr_String(t *testing.T) {
`10 / (5/2)`,
`10 / (count_over_time({job="postgres"}[5m])/2)`,
`{app="foo"} | json response_status="response.status.code", first_param="request.params[0]"`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseExpr(tc)
Expand Down
38 changes: 38 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m] offset 30s)`, time.Unix(90, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
{newSeries(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 60 = 6 total
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}|~".+bar"[1m] offset 30s)`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 90 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -237,6 +247,34 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
},
},
{
`sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m] offset 30s)) by (namespace,app)`, time.Unix(90, 0), logproto.FORWARD, 100,
[][]logproto.Series{
{
newSeries(testSize, factor(10, identity), `{app="foo", namespace="a"}`),
newSeries(testSize, factor(10, identity), `{app="bar", namespace="b"}`),
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (namespace,app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m] offset 30s)) `}},
},
promql.Vector{
promql.Sample{
Point: promql.Point{T: 90 * 1000, V: 6},
Metric: labels.Labels{
labels.Label{Name: "app", Value: "bar"},
labels.Label{Name: "namespace", Value: "b"},
},
},
promql.Sample{
Point: promql.Point{T: 90 * 1000, V: 6},
Metric: labels.Labels{
labels.Label{Name: "app", Value: "foo"},
labels.Label{Name: "namespace", Value: "a"},
},
},
},
},
{
`label_replace(
sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (namespace,app),
Expand Down
28 changes: 14 additions & 14 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ func NewLiteralParams(

// LiteralParams impls Params
type LiteralParams struct {
qs string
start, end time.Time
step time.Duration
interval time.Duration
direction logproto.Direction
limit uint32
shards []string
qs string
start, end time.Time
step, interval time.Duration
direction logproto.Direction
limit uint32
shards []string
}

func (p LiteralParams) Copy() LiteralParams { return p }
Expand Down Expand Up @@ -175,32 +174,32 @@ func (ev *DefaultEvaluator) StepEvaluator(
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Start: q.Start().Add(-rangExpr.left.interval).Add(-rangExpr.left.offset),
End: q.End().Add(-rangExpr.left.offset),
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q)
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q, rangExpr.left.offset)
})
}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Start: q.Start().Add(-e.left.interval).Add(-e.left.offset),
End: q.End().Add(-e.left.offset),
Selector: expr.String(),
Shards: q.Shards(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), e, q)
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), e, q, e.left.offset)
case *binOpExpr:
return binOpStepEvaluator(ctx, nextEv, e, q)
case *labelReplaceExpr:
Expand Down Expand Up @@ -407,6 +406,7 @@ func rangeAggEvaluator(
it iter.PeekingSampleIterator,
expr *rangeAggregationExpr,
q Params,
o time.Duration,
) (StepEvaluator, error) {
agg, err := expr.aggregator()
if err != nil {
Expand All @@ -416,7 +416,7 @@ func rangeAggEvaluator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{
Expand Down
44 changes: 30 additions & 14 deletions pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
JSONExpression log.JSONExpression
JSONExpressionList []log.JSONExpression
UnwrapExpr *unwrapExpr
OffsetExpr *offsetExpr
}

%start root
Expand Down Expand Up @@ -90,6 +91,7 @@ import (
%type <JSONExpressionList> jsonExpressionList
%type <UnwrapExpr> unwrapExpr
%type <UnitFilter> unitFilter
%type <OffsetExpr> offsetExpr

%token <bytes> BYTES
%token <str> IDENTIFIER STRING NUMBER
Expand All @@ -98,7 +100,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE UNPACK
ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -133,19 +135,31 @@ logExpr:
;

logRangeExpr:
Copy link
Member

@owen-d owen-d Mar 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how we're duplicating all paths here. I know we've done this in the past, but it's getting out of hand (not your fault).

@cyriltovena we should consider refactoring this a bit, but I'm fine with this PR.

selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2 , $3) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4 , $5) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil, nil ) }
| selector RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $2, nil, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $4, nil, $5 ) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $3, nil ) }
| selector RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $4, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $5, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $6, $5 ) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2, nil ) }
| selector unwrapExpr RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $3, $2, $4 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3, nil ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $5, $3, $6 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, nil ) }
| selector pipelineExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, $4 ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, $6 ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, nil ) }
| selector pipelineExpr unwrapExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, $5 ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, $7 ) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil, nil) }
| selector RANGE offsetExpr pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, nil, $3 ) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4, nil ) }
| selector RANGE offsetExpr pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, $5, $3 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
| logRangeExpr error
;

Expand Down Expand Up @@ -363,6 +377,8 @@ rangeOp:
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;

offsetExpr:
OFFSET DURATION { $$ = newOffsetExpr( $2 ) }

labels:
IDENTIFIER { $$ = []string{ $1 } }
Expand Down
Loading