Skip to content

Commit

Permalink
This logs queries with latency tag when recording stats. (#1733)
Browse files Browse the repository at this point in the history
* This logs queries  with latency tag when  recording stats.

Useful for building a dashboard of slow queries.

Signed-off-by: Cyril Tovena <[email protected]>

* mod-check.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Feb 24, 2020
1 parent 8b8d3a4 commit 9be21a7
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 46 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.4.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200201141823-27e183090ab1
go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status = "400"
}
}
RecordMetrics(status, q.String(), rangeType, statResult)
RecordMetrics(ctx, q, status, statResult)

return Result{
Data: data,
Expand Down Expand Up @@ -181,7 +181,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
defer cancel()

qs := q.String()
qs := q.Query()

expr, err := ParseExpr(qs)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (

// Params details the parameters associated with a loki request
type Params interface {
String() string
Query() string
Start() time.Time
End() time.Time
Step() time.Duration
Expand All @@ -42,7 +42,7 @@ type LiteralParams struct {
}

// String impls Params
func (p LiteralParams) String() string { return p.qs }
func (p LiteralParams) Query() string { return p.qs }

// Start impls Params
func (p LiteralParams) Start() time.Time { return p.start }
Expand Down
26 changes: 23 additions & 3 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package logql

import (
"context"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -59,12 +62,12 @@ var (
})
)

func RecordMetrics(status, query string, rangeType QueryRangeType, stats stats.Result) {
queryType, err := QueryType(query)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(rangeType)
rt := string(GetRangeType(p))

// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
Expand All @@ -73,6 +76,23 @@ func RecordMetrics(status, query string, rangeType QueryRangeType, stats stats.R
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(
// ensure we have traceID & orgId
util.WithContext(ctx, util.Logger),
).Log(
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
"step", p.Step(),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"throughput_mb", float64(stats.Summary.BytesProcessedPerSeconds)/10e6,
"total_bytes_mb", float64(stats.Summary.TotalBytesProcessed)/10e6,
)

bytesPerSeconds.WithLabelValues(status, queryType, rt, latencyType).
Observe(float64(stats.Summary.BytesProcessedPerSeconds))
execLatency.WithLabelValues(status, queryType, rt).
Expand Down
46 changes: 46 additions & 0 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
package logql

import (
"bytes"
"context"
"fmt"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
)

func TestQueryType(t *testing.T) {
Expand Down Expand Up @@ -32,3 +46,35 @@ func TestQueryType(t *testing.T) {
})
}
}

func TestLogSlowQuery(t *testing.T) {
buf := bytes.NewBufferString("")
util.Logger = log.NewLogfmtLogger(buf)
tr, c := jaeger.NewTracer("foo", jaeger.NewConstSampler(true), jaeger.NewInMemoryReporter())
defer c.Close()
opentracing.SetGlobalTracer(tr)
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordMetrics(ctx, LiteralParams{
qs: `{foo="bar"} |= "buzz"`,
direction: logproto.BACKWARD,
end: now,
start: now.Add(-1 * time.Hour),
limit: 1000,
step: time.Minute,
}, "200", stats.Result{
Summary: stats.Summary{
BytesProcessedPerSeconds: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo trace_id=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 throughput_mb=0.01 total_bytes_mb=0.01\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
util.Logger = log.NewNopLogger()
}
2 changes: 1 addition & 1 deletion pkg/logql/stats/stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/logql/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message Summary {
int64 totalBytesProcessed = 3 [(gogoproto.jsontag) = "totalBytesProcessed"];
// Total lines processed.
int64 totalLinesProcessed = 4 [(gogoproto.jsontag) = "totalLinesProcessed"];
// Execution time in nanoseconds.
// Execution time in seconds.
double execTime = 5 [(gogoproto.jsontag) = "execTime"];
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func paramsFromRequest(req queryrange.Request) *paramsWrapper {
}
}

func (p paramsWrapper) String() string {
return p.Query
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
}
func (p paramsWrapper) Start() time.Time {
return p.StartTs
Expand Down
25 changes: 12 additions & 13 deletions pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,26 @@ type ctxKeyType string
const ctxKey ctxKeyType = "stats"

var (
defaultMetricRecorder = metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
logql.RecordMetrics(status, query, rangeType, stats)
defaultMetricRecorder = metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
logql.RecordMetrics(ctx, p, status, stats)
})
// StatsHTTPMiddleware is an http middleware to record stats for query_range filter.
StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder)
)

type metricRecorder interface {
Record(status, query string, rangeType logql.QueryRangeType, stats stats.Result)
Record(ctx context.Context, p logql.Params, status string, stats stats.Result)
}

type metricRecorderFn func(status, query string, rangeType logql.QueryRangeType, stats stats.Result)
type metricRecorderFn func(ctx context.Context, p logql.Params, status string, stats stats.Result)

func (m metricRecorderFn) Record(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
m(status, query, rangeType, stats)
func (m metricRecorderFn) Record(ctx context.Context, p logql.Params, status string, stats stats.Result) {
m(ctx, p, status, stats)
}

type queryData struct {
query string
params logql.Params
statistics *stats.Result
rangeType logql.QueryRangeType
recorded bool
}

Expand All @@ -53,9 +52,10 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := &queryData{}
interceptor := &interceptor{ResponseWriter: w, statusCode: http.StatusOK}
r = r.WithContext(context.WithValue(r.Context(), ctxKey, data))
next.ServeHTTP(
interceptor,
r.WithContext(context.WithValue(r.Context(), ctxKey, data)),
r,
)
// http middlewares runs for every http request.
// but we want only to record query_range filters.
Expand All @@ -64,9 +64,9 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
data.statistics = &stats.Result{}
}
recorder.Record(
r.Context(),
data.params,
strconv.Itoa(interceptor.statusCode),
data.query,
data.rangeType,
*data.statistics,
)
}
Expand Down Expand Up @@ -104,9 +104,8 @@ func StatsCollectorMiddleware() queryrange.Middleware {
ctxValue := ctx.Value(ctxKey)
if data, ok := ctxValue.(*queryData); ok {
data.recorded = true
data.query = req.GetQuery()
data.statistics = statistics
data.rangeType = logql.GetRangeType(paramsFromRequest(req))
data.params = paramsFromRequest(req)
}
return resp, err
})
Expand Down
56 changes: 34 additions & 22 deletions pkg/querier/queryrange/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@ import (
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

func TestStatsCollectorMiddleware(t *testing.T) {
// no stats
data := &queryData{}
var (
data = &queryData{}
now = time.Now()
)
ctx := context.WithValue(context.Background(), ctxKey, data)
_, _ = StatsCollectorMiddleware().Wrap(queryrange.HandlerFunc(func(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
return nil, nil
})).Do(ctx, &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, "foo", data.query)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, true, data.recorded)
require.Equal(t, logql.RangeType, data.rangeType)
require.Equal(t, now, data.params.Start())
require.Nil(t, data.statistics)

// no context.
Expand All @@ -37,7 +41,7 @@ func TestStatsCollectorMiddleware(t *testing.T) {
return nil, nil
})).Do(context.Background(), &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, false, data.recorded)

Expand All @@ -54,27 +58,27 @@ func TestStatsCollectorMiddleware(t *testing.T) {
}, nil
})).Do(ctx, &LokiRequest{
Query: "foo",
StartTs: time.Now(),
StartTs: now,
})
require.Equal(t, "foo", data.query)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, true, data.recorded)
require.Equal(t, logql.RangeType, data.rangeType)
require.Equal(t, now, data.params.Start())
require.Equal(t, int32(10), data.statistics.Ingester.TotalReached)
}

func Test_StatsHTTP(t *testing.T) {
for _, test := range []struct {
name string
next http.Handler
expect func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result)
expect func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result)
}{
{
"should not record metric if nothing is recorded",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = false
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) {
t.Fail()
},
},
Expand All @@ -83,14 +87,18 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.rangeType = logql.RangeType
data.query = "foo"
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = nil
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), status)
require.Equal(t, logql.RangeType, rangeType)
require.Equal(t, "foo", query)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, stats.Result{}, s)
},
},
Expand All @@ -99,22 +107,26 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.rangeType = logql.RangeType
data.query = "foo"
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = &statsResult
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, status, query string, rangeType logql.QueryRangeType, s stats.Result) {
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), status)
require.Equal(t, logql.RangeType, rangeType)
require.Equal(t, "foo", query)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, statsResult, s)
},
},
} {
t.Run(test.name, func(t *testing.T) {
statsHTTPMiddleware(metricRecorderFn(func(status, query string, rangeType logql.QueryRangeType, stats stats.Result) {
test.expect(t, status, query, rangeType, stats)
statsHTTPMiddleware(metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
test.expect(t, ctx, p, status, stats)
})).Wrap(test.next).ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/foo", strings.NewReader("")))
})
}
Expand Down

0 comments on commit 9be21a7

Please sign in to comment.