Skip to content

Commit

Permalink
Labels computation LogQLv2 (#2875)
Browse files Browse the repository at this point in the history
* Adding a benchmark to compare before and after.

Signed-off-by: Cyril Tovena <[email protected]>

* Improves labels management in logql v2.

Signed-off-by: Cyril Tovena <[email protected]>

* Cache grouped result when no changes has occured.

Signed-off-by: Cyril Tovena <[email protected]>

* Removes unused methods.

Signed-off-by: Cyril Tovena <[email protected]>

* Adds docs and tests.

Signed-off-by: Cyril Tovena <[email protected]>

* Uncomment tests.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Nov 5, 2020
1 parent bc202ae commit 53f4aa4
Show file tree
Hide file tree
Showing 31 changed files with 797 additions and 358 deletions.
8 changes: 3 additions & 5 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"sort"
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)

const (
Expand Down Expand Up @@ -72,7 +70,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand All @@ -97,7 +95,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi
}, nil
}

func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ labels.Labels, _ logql.SampleExtractor) iter.SampleIterator {
func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)

// Errors returned by the chunk interface.
Expand Down Expand Up @@ -100,8 +98,8 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
Size() int
Expand All @@ -126,7 +124,7 @@ type Block interface {
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}
56 changes: 26 additions & 30 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)

Expand Down Expand Up @@ -471,19 +470,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, lbs, pipeline))
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.isEmpty() {
its = append(its, c.head.iterator(ctx, direction, mint, maxt, lbs, pipeline))
its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline))
}

if direction == logproto.FORWARD {
Expand Down Expand Up @@ -513,19 +512,19 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}

// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, lbs, extractor))
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}

if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, lbs, extractor))
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
}

return iter.NewTimeRangedSampleIterator(
Expand Down Expand Up @@ -557,18 +556,18 @@ type encBlock struct {
block
}

func (b encBlock) Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, lbs, pipeline)
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline)
}

func (b encBlock) SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, lbs, extractor)
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor)
}

func (b block) Offset() int {
Expand All @@ -585,7 +584,7 @@ func (b block) MaxTime() int64 {
return b.maxt
}

func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
Expand All @@ -601,7 +600,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
newLine, parsedLbs, ok := pipeline.Process(line, lbs)
newLine, parsedLbs, ok := pipeline.Process(line)
if !ok {
continue
}
Expand Down Expand Up @@ -630,7 +629,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}

func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
Expand All @@ -640,7 +639,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
value, parsedLabels, ok := extractor.Process(line, lbs)
value, parsedLabels, ok := extractor.Process(line)
if !ok {
continue
}
Expand Down Expand Up @@ -687,11 +686,9 @@ type bufferedIterator struct {
currTs int64

closed bool

baseLbs labels.Labels
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.CompressedBytes += int64(len(b))
return &bufferedIterator{
Expand All @@ -701,7 +698,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs lab
bufReader: nil, // will be initialized later
pool: pool,
decBuf: make([]byte, binary.MaxVarintLen64),
baseLbs: lbs,
}
}

Expand Down Expand Up @@ -806,19 +802,19 @@ func (si *bufferedIterator) close() {
si.decBuf = nil
}

func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator {
return &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
pipeline: pipeline,
}
}

type entryBufferedIterator struct {
*bufferedIterator
pipeline logql.Pipeline
pipeline log.StreamPipeline

cur logproto.Entry
currLabels labels.Labels
currLabels log.LabelsResult
}

func (e *entryBufferedIterator) Entry() logproto.Entry {
Expand All @@ -829,7 +825,7 @@ func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() }

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currLine, e.baseLbs)
newLine, lbs, ok := e.pipeline.Process(e.currLine)
if !ok {
continue
}
Expand All @@ -841,9 +837,9 @@ func (e *entryBufferedIterator) Next() bool {
return false
}

func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator {
it := &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
extractor: extractor,
}
return it
Expand All @@ -852,15 +848,15 @@ func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs label
type sampleBufferedIterator struct {
*bufferedIterator

extractor logql.SampleExtractor
extractor log.StreamSampleExtractor

cur logproto.Sample
currLabels labels.Labels
currLabels log.LabelsResult
}

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currLine, e.baseLbs)
val, labels, ok := e.extractor.Process(e.currLine)
if !ok {
continue
}
Expand Down
Loading

0 comments on commit 53f4aa4

Please sign in to comment.