Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing index cache #6954

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6891](https://github.com/thanos-io/thanos/pull/6891) Objstore: Bump `objstore` which adds support for Azure Workload Identity.
- [#6453](https://github.com/thanos-io/thanos/pull/6453) Sidecar: Added `--reloader.method` to support configuration reloads via SIHUP signal.
- [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram.
- [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs.

### Changed

Expand Down
1 change: 1 addition & 0 deletions pkg/store/cache/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type))
}

cache = NewTracingIndexCache(string(cacheConfig.Type), cache)
if len(cacheConfig.EnabledItems) > 0 {
if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil {
return nil, err
Expand Down
92 changes: 92 additions & 0 deletions pkg/store/cache/tracing_index_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/tracing"
)

type TracingIndexCache struct {
name string
cache IndexCache
}

// NewTracingIndexCache creates an index cache wrapper with traces instrumentation.
func NewTracingIndexCache(name string, cache IndexCache) *TracingIndexCache {
return &TracingIndexCache{name: name, cache: cache}
}

// StorePostings stores postings for a single series.
func (c *TracingIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.cache.StorePostings(blockID, l, v, tenant)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *TracingIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
span, newCtx := tracing.StartSpan(ctx, "fetch_multi_postings", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
"requested": len(keys),
})
defer span.Finish()
hits, misses = c.cache.FetchMultiPostings(newCtx, blockID, keys, tenant)
span.SetTag("hits", len(hits))
dataBytes := 0
for _, v := range hits {
dataBytes += len(v)
}
span.SetTag("bytes", dataBytes)
return hits, misses
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *TracingIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *TracingIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) (data []byte, exists bool) {
span, newCtx := tracing.StartSpan(ctx, "fetch_expanded_postings", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
})
defer span.Finish()
data, exists = c.cache.FetchExpandedPostings(newCtx, blockID, matchers, tenant)
if exists {
span.SetTag("bytes", len(data))
}
return data, exists
}

// StoreSeries stores a single series. Skip intrumenting this method
// excessive spans as a single request can store millions of serieses.
func (c *TracingIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
c.cache.StoreSeries(blockID, id, v, tenant)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *TracingIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
span, newCtx := tracing.StartSpan(ctx, "fetch_multi_series", tracing.Tags{
"name": c.name,
"block.id": blockID.String(),
"requested": len(ids),
})
defer span.Finish()
hits, misses = c.cache.FetchMultiSeries(newCtx, blockID, ids, tenant)
span.SetTag("hits", len(hits))
dataBytes := 0
for _, v := range hits {
dataBytes += len(v)
}
span.SetTag("bytes", dataBytes)
return hits, misses
}
Loading