From f7bd65cba932118880bf5cb157d1aa4fa896bbf5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 2 Feb 2021 17:24:25 +0100 Subject: [PATCH] Fixes logfmt parser hints. Recently introduce a bug where label where tested for extraction against non sanitized version. Signed-off-by: Cyril Tovena --- pkg/logql/log/parser.go | 2 +- pkg/logql/log/parser_hints_test.go | 47 ++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index fe2fe030cb1e6..79360b1d25a82 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -244,7 +244,7 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { } l.dec.Reset(line) for l.dec.ScanKeyval() { - if !lbs.ParserLabelHints().ShouldExtract(string(l.dec.Key())) { + if !lbs.ParserLabelHints().ShouldExtract(sanitizeLabelKey(string(l.dec.Key()), true)) { continue } key := string(l.dec.Key()) diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index bef9238908c8a..733aff77e02de 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -10,7 +10,8 @@ import ( "github.com/grafana/loki/pkg/logql" ) -var jsonLine = []byte(`{ +var ( + jsonLine = []byte(`{ "remote_user": "foo", "upstream_addr": "10.0.0.1:80", "protocol": "HTTP/2.0", @@ -27,76 +28,118 @@ var jsonLine = []byte(`{ } }`) + logfmtLine = []byte(`ts=2021-02-02T14:35:05.983992774Z caller=spanlogger.go:79 org_id=3677 traceID=2e5c7234b8640997 Ingester.TotalReached=15 Ingester.TotalChunksMatched=0 Ingester.TotalBatches=0`) +) + func Test_ParserHints(t *testing.T) { lbs := labels.Labels{{Name: "app", Value: "nginx"}} t.Parallel() for _, tt := range []struct { expr string + line []byte expectOk bool expectVal float64 expectLbs string }{ { `rate({app="nginx"} | json | response_status = 204 [1m])`, + jsonLine, true, 1.0, `{app="nginx", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, + jsonLine, true, 1.0, `{protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, + jsonLine, true, 1.0, `{app="nginx", request_host="foo.grafana.net"}`, }, { `sum(rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, + jsonLine, true, 1.0, `{}`, }, { `sum(rate({app="nginx"} | json [1m]))`, + jsonLine, true, 1.0, `{}`, }, { `sum(rate({app="nginx"} | json | unwrap response_latency_seconds [1m]))`, + jsonLine, true, 30.001, `{}`, }, { `sum(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`, + jsonLine, true, 30.001, `{}`, }, { `sum by (request_host,app)(rate({app="nginx"} | json | response_status = 204 and remote_user = "foo" | unwrap response_latency_seconds [1m]))`, + jsonLine, true, 30.001, `{app="nginx", request_host="foo.grafana.net"}`, }, { `rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m])`, + jsonLine, true, 30.001, `{app="nginx", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`, + jsonLine, true, 30.001, `{protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, + { + `sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`, + logfmtLine, + true, + 15.0, + `{}`, + }, + { + `sum by (org_id,app) (rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`, + logfmtLine, + true, + 15.0, + `{app="nginx", org_id="3677"}`, + }, + { + `rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m])`, + logfmtLine, + true, + 15.0, + `{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", app="nginx", caller="spanlogger.go:79", org_id="3677", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`, + }, + { + `sum without (org_id,app)(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`, + logfmtLine, + true, + 15.0, + `{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", caller="spanlogger.go:79", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`, + }, } { tt := tt t.Run(tt.expr, func(t *testing.T) { @@ -105,7 +148,7 @@ func Test_ParserHints(t *testing.T) { require.NoError(t, err) ex, err := expr.Extractor() require.NoError(t, err) - v, lbsRes, ok := ex.ForStream(lbs).Process(jsonLine) + v, lbsRes, ok := ex.ForStream(lbs).Process(tt.line) var lbsResString string if lbsRes != nil { lbsResString = lbsRes.String()