Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logql query statistics collection. #1573

Merged
merged 9 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions pkg/chunkenc/decompression/context.go

This file was deleted.

27 changes: 14 additions & 13 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

const (
Expand Down Expand Up @@ -477,7 +477,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}

if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
its = append(its, c.head.iterator(ctx, mint, maxt, filter))
}

iterForward := iter.NewTimeRangedIterator(
Expand All @@ -500,18 +500,21 @@ func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filte
return newBufferedIterator(ctx, pool, b.b, filter)
}

func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}

chunkStats := stats.GetChunkData(ctx)

// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.

chunkStats.LinesUncompressed += int64(len(hb.entries))
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
chunkStats.BytesUncompressed += int64(len(e.s))
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
Expand Down Expand Up @@ -558,9 +561,8 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }

type bufferedIterator struct {
origBytes []byte
rootCtx context.Context
bytesDecompressed int64
origBytes []byte
stats *stats.ChunkData

bufReader *bufio.Reader
reader io.Reader
Expand All @@ -579,8 +581,10 @@ type bufferedIterator struct {
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.BytesCompressed += int64(len(b))
return &bufferedIterator{
rootCtx: ctx,
stats: chunkStats,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
Expand All @@ -604,7 +608,8 @@ func (si *bufferedIterator) Next() bool {
return false
}
// we decode always the line length and ts as varint
si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.BytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.LinesDecompressed++
if si.filter != nil && !si.filter(line) {
continue
}
Expand Down Expand Up @@ -682,10 +687,6 @@ func (si *bufferedIterator) Close() error {
}

func (si *bufferedIterator) close() {
decompression.Mutate(si.rootCtx, func(current *decompression.Stats) {
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})
if si.reader != nil {
si.pool.PutReader(si.reader)
si.reader = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
iter := h.iterator(0, math.MaxInt64, nil)
iter := h.iterator(context.Background(), 0, math.MaxInt64, nil)

for iter.Next() {
_ = iter.Entry()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -252,6 +253,8 @@ type mockQuerierServer struct {
grpc.ServerStream
}

func (*mockQuerierServer) SetTrailer(metadata.MD){}

func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error {
m.resps = append(m.resps, resp)
return nil
Expand Down
55 changes: 32 additions & 23 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -186,6 +187,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)

expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
Expand All @@ -195,12 +200,13 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
Expand All @@ -212,10 +218,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction)
iter := iter.NewHeapIterator(ctx, iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)

return sendBatches(iter, queryServer, req.Limit)
return sendBatches(ctx, iter, queryServer, req.Limit)
}

func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
Expand Down Expand Up @@ -359,10 +365,28 @@ func isDone(ctx context.Context) bool {
}
}

func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
ingStats := stats.GetIngesterData(ctx)
if limit == 0 {
return sendAllBatches(i, queryServer)
// send all batches.
for !isDone(ctx) {
batch, size, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}

if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(size)
ingStats.TotalBatches++
}
return nil
}
// send until the limit is reached.
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
Expand All @@ -378,23 +402,8 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer,
if err := queryServer.Send(batch); err != nil {
return err
}
}
return nil
}

func sendAllBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for !isDone(queryServer.Context()) {
batch, _, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}

if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(batchSize)
ingStats.TotalBatches++
}
return nil
}
20 changes: 8 additions & 12 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"io"
"time"

"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
)

// EntryIterator iterates over entries in time-order.
Expand Down Expand Up @@ -132,19 +132,18 @@ type heapIterator struct {
}
is []EntryIterator
prefetched bool
ctx context.Context
stats *stats.ChunkData

tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
linesDuplicate int64
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
}

// NewHeapIterator returns a new iterator which uses a heap to merge together
// entries for multiple interators.
func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{is: is, ctx: ctx}
result := &heapIterator{is: is, stats: stats.GetChunkData(ctx)}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
Expand Down Expand Up @@ -241,7 +240,7 @@ func (i *heapIterator) Next() bool {
i.requeue(i.tuples[j].EntryIterator, true)
continue
}
i.linesDuplicate++
i.stats.TotalDuplicates++
i.requeue(i.tuples[j].EntryIterator, false)
}
i.tuples = i.tuples[:0]
Expand Down Expand Up @@ -311,9 +310,6 @@ func (i *heapIterator) Error() error {
}

func (i *heapIterator) Close() error {
decompression.Mutate(i.ctx, func(m *decompression.Stats) {
m.TotalDuplicates += i.linesDuplicate
})
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ message Chunk {

message TransferChunksResponse {

}
}
17 changes: 5 additions & 12 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -164,19 +163,13 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}

ctx = decompression.NewContext(ctx)
ctx = stats.NewContext(ctx)
start := time.Now()
defer func() {
stats := decompression.GetStats(ctx)
level.Debug(log).Log(
"Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond),
"Total Duplicates", stats.TotalDuplicates,
"Fetched chunks", stats.FetchedChunks,
"Total bytes compressed (MB)", stats.BytesCompressed/1024/1024,
"Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024,
"Total exec time (ms)", time.Since(start).Nanoseconds()/int64(time.Millisecond),
)
resultStats := stats.Snapshot(ctx, time.Since(start))
stats.Log(log, resultStats)
}()

switch e := expr.(type) {
case SampleExpr:
if err := ng.setupIterators(ctx, e, q); err != nil {
Expand Down
Loading