Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds batch based metrics #2510

Merged
merged 2 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 74 additions & 5 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
Expand All @@ -22,6 +24,55 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

type ChunkMetrics struct {
refs prometheus.Counter
filteredRefs prometheus.Counter
chunks *prometheus.CounterVec
batches prometheus.Histogram
}

const (
statusFiltered = "filtered"
statusMatched = "matched"
)

func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
buckets := 5
if maxBatchSize < buckets {
maxBatchSize = buckets
}

return &ChunkMetrics{
refs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "index",
Name: "chunk_refs_pre_filtering_total",
Help: "Number of chunks refs downloaded.",
}),
filteredRefs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "index",
Name: "chunk_refs_post_filtering_total",
Help: "Number of chunks refs downloaded whose bounds intersect the query bounds.",
}),
chunks: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Subsystem: "store",
Name: "chunks_downloaded_total",
Help: "Number of chunks downloaded, partitioned by if they satisfy matchers.",
}, []string{"status"}),
batches: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "store",
Name: "chunks_per_batch_post_filtering",
Help: "The post-matching chunk batch size.",

// split buckets evenly across 0->maxBatchSize
Buckets: prometheus.LinearBuckets(0, float64(maxBatchSize/buckets), buckets),
}),
}
}

type genericIterator interface {
Next() bool
Labels() string
Expand Down Expand Up @@ -297,13 +348,15 @@ type logBatchIterator struct {
*batchChunkIterator

ctx context.Context
metrics *ChunkMetrics
matchers []*labels.Matcher
filter logql.LineFilter
labels labelCache
}

func newLogBatchIterator(
ctx context.Context,
metrics *ChunkMetrics,
chunks []*LazyChunk,
batchSize int,
matchers []*labels.Matcher,
Expand All @@ -319,6 +372,7 @@ func newLogBatchIterator(
labels: map[model.Fingerprint]string{},
matchers: matchers,
filter: filter,
metrics: metrics,
ctx: ctx,
}

Expand All @@ -336,7 +390,7 @@ func (it *logBatchIterator) Entry() logproto.Entry {

// newChunksIterator creates an iterator over a set of lazychunks.
func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
chksBySeries, err := fetchChunkBySeries(it.ctx, chunks, it.matchers)
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -396,6 +450,7 @@ type sampleBatchIterator struct {
*batchChunkIterator

ctx context.Context
metrics *ChunkMetrics
matchers []*labels.Matcher
filter logql.LineFilter
extractor logql.SampleExtractor
Expand All @@ -404,6 +459,7 @@ type sampleBatchIterator struct {

func newSampleBatchIterator(
ctx context.Context,
metrics *ChunkMetrics,
chunks []*LazyChunk,
batchSize int,
matchers []*labels.Matcher,
Expand All @@ -421,6 +477,7 @@ func newSampleBatchIterator(
matchers: matchers,
filter: filter,
extractor: extractor,
metrics: metrics,
ctx: ctx,
}
batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator)
Expand All @@ -438,7 +495,7 @@ func (it *sampleBatchIterator) Sample() logproto.Sample {

// newChunksIterator creates an iterator over a set of lazychunks.
func (it *sampleBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
chksBySeries, err := fetchChunkBySeries(it.ctx, chunks, it.matchers)
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -499,7 +556,7 @@ func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels
return matchers
}

func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*labels.Matcher) (map[model.Fingerprint][][]*LazyChunk, error) {
func fetchChunkBySeries(ctx context.Context, metrics *ChunkMetrics, chunks []*LazyChunk, matchers []*labels.Matcher) (map[model.Fingerprint][][]*LazyChunk, error) {
chksBySeries := partitionBySeriesChunks(chunks)

// Make sure the initial chunks are loaded. This is not one chunk
Expand All @@ -510,7 +567,7 @@ func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*la

// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, metrics)

var allChunks []*LazyChunk
for _, series := range chksBySeries {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering what a good way would be to also track how many series there were.

I'm not sure this is a fully vetted thought yet but part of me thinks we will want to know something about the number of series involved too.

Expand All @@ -523,19 +580,31 @@ func fetchChunkBySeries(ctx context.Context, chunks []*LazyChunk, matchers []*la
if err := fetchLazyChunks(ctx, allChunks); err != nil {
return nil, err
}
metrics.chunks.WithLabelValues(statusMatched).Add(float64(len(allChunks)))
metrics.batches.Observe(float64(len(allChunks)))

return chksBySeries, nil
}

func filterSeriesByMatchers(chks map[model.Fingerprint][][]*LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*LazyChunk {
func filterSeriesByMatchers(
chks map[model.Fingerprint][][]*LazyChunk,
matchers []*labels.Matcher,
metrics *ChunkMetrics,
) map[model.Fingerprint][][]*LazyChunk {
var filtered int // Number of chunks downlaoded to check labels, but filtered out after.
outer:
for fp, chunks := range chks {
for _, matcher := range matchers {
if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {

delete(chks, fp)
filtered++

continue outer
}
}
}
metrics.chunks.WithLabelValues(statusFiltered).Add(float64(filtered))
return chks
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

var NilMetrics = NewChunkMetrics(nil, 0)

func Test_batchIterSafeStart(t *testing.T) {
stream := logproto.Stream{
Labels: fooLabelsWithName,
Expand Down Expand Up @@ -955,7 +957,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it, err := newLogBatchIterator(context.Background(), tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, tt.direction, tt.start, tt.end)
it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, tt.direction, tt.start, tt.end)
require.NoError(t, err)
streams, _, err := iter.ReadBatch(it, 1000)
_ = it.Close()
Expand Down Expand Up @@ -1240,7 +1242,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it, err := newSampleBatchIterator(context.Background(), tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, logql.ExtractCount, tt.start, tt.end)
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), nil, logql.ExtractCount, tt.start, tt.end)
require.NoError(t, err)
series, _, err := iter.ReadSampleBatch(it, 1000)
_ = it.Close()
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ type Store interface {

type store struct {
chunk.Store
cfg Config
cfg Config
chunkMetrics *ChunkMetrics
}

// NewStore creates a new Loki Store using configuration supplied.
Expand All @@ -86,8 +87,9 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg SchemaConfig, li
return nil, err
}
return &store{
Store: s,
cfg: cfg,
Store: s,
cfg: cfg,
chunkMetrics: NewChunkMetrics(registerer, cfg.MaxChunkBatchSize),
}, nil
}

Expand Down Expand Up @@ -162,14 +164,19 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from
return nil, err
}

var totalChunks int
var prefilter int
var filtered int
for i := range chks {
prefilter += len(chks[i])
storeStats.TotalChunksRef += int64(len(chks[i]))
chks[i] = filterChunksByTime(from, through, chks[i])
totalChunks += len(chks[i])
filtered += len(chks[i])
}
s.chunkMetrics.refs.Add(float64(prefilter))
s.chunkMetrics.filteredRefs.Add(float64(filtered))

// creates lazychunks with chunks ref.
lazyChunks := make([]*LazyChunk, 0, totalChunks)
lazyChunks := make([]*LazyChunk, 0, filtered)
for i := range chks {
for _, c := range chks[i] {
lazyChunks = append(lazyChunks, &LazyChunk{Chunk: c, Fetcher: fetchers[i]})
Expand Down Expand Up @@ -275,7 +282,7 @@ func (s *store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter
return iter.NoopIterator, nil
}

return newLogBatchIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.Direction, req.Start, req.End)
return newLogBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.Direction, req.Start, req.End)

}

Expand Down Expand Up @@ -303,7 +310,7 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
if len(lazyChunks) == 0 {
return iter.NoopIterator, nil
}
return newSampleBatchIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, extractor, req.Start, req.End)
return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, extractor, req.Start, req.End)
}

func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk {
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func Test_store_SelectLogs(t *testing.T) {
cfg: Config{
MaxChunkBatchSize: 10,
},
chunkMetrics: NilMetrics,
}

ctx = user.InjectOrgID(context.Background(), "test-user")
Expand Down Expand Up @@ -591,6 +592,7 @@ func Test_store_SelectSample(t *testing.T) {
cfg: Config{
MaxChunkBatchSize: 10,
},
chunkMetrics: NilMetrics,
}

ctx = user.InjectOrgID(context.Background(), "test-user")
Expand Down Expand Up @@ -660,6 +662,7 @@ func Test_store_GetSeries(t *testing.T) {
cfg: Config{
MaxChunkBatchSize: tt.batchSize,
},
chunkMetrics: NilMetrics,
}
ctx = user.InjectOrgID(context.Background(), "test-user")
out, err := s.GetSeries(ctx, logql.SelectLogParams{QueryRequest: tt.req})
Expand Down