From 9d892571c3d3914359a210f4ae745bba6194b4b4 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Tue, 23 Apr 2019 16:43:08 +0100 Subject: [PATCH] Ensured index cache is best effort, refactored tests, validated edge cases. Fixes https://github.com/improbable-eng/thanos/issues/651 Signed-off-by: Bartek Plotka --- pkg/store/bucket.go | 29 ++- pkg/store/bucket_e2e_test.go | 6 +- pkg/store/cache.go | 216 ---------------------- pkg/store/cache/cache.go | 335 ++++++++++++++++++++++++++++++++++ pkg/store/cache/cache_test.go | 207 +++++++++++++++++++++ pkg/store/cache_test.go | 89 --------- 6 files changed, 566 insertions(+), 316 deletions(-) delete mode 100644 pkg/store/cache.go create mode 100644 pkg/store/cache/cache.go create mode 100644 pkg/store/cache/cache_test.go delete mode 100644 pkg/store/cache_test.go diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b75e16e58c0..b85597df894 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -26,6 +26,7 @@ import ( "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/pool" "github.com/improbable-eng/thanos/pkg/runutil" + storecache "github.com/improbable-eng/thanos/pkg/store/cache" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/strutil" "github.com/improbable-eng/thanos/pkg/tracing" @@ -182,7 +183,7 @@ type BucketStore struct { metrics *bucketStoreMetrics bucket objstore.BucketReader dir string - indexCache *indexCache + indexCache *storecache.IndexCache chunkPool *pool.BytesPool // Sets of blocks that have the same labels. They are indexed by a hash over their label set. @@ -225,10 +226,18 @@ func NewBucketStore( return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) } - indexCache, err := newIndexCache(reg, indexCacheSizeBytes) + // TODO(bwplotka): Add as a flag? + maxItemSizeBytes := uint64(float64(indexCacheSizeBytes) * 0.5) + + indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{ + SetTimeout: 300 * time.Millisecond, // TODO(bwplotka): Add as a flag? + MaxSizeBytes: indexCacheSizeBytes, + MaxItemSizeBytes: maxItemSizeBytes, + }) if err != nil { return nil, errors.Wrap(err, "create index cache") } + chunkPool, err := pool.NewBytesPool(2e5, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, errors.Wrap(err, "create chunk pool") @@ -1058,7 +1067,7 @@ type bucketBlock struct { bucket objstore.BucketReader meta *metadata.Meta dir string - indexCache *indexCache + indexCache *storecache.IndexCache chunkPool *pool.BytesPool indexVersion int @@ -1081,7 +1090,7 @@ func newBucketBlock( bkt objstore.BucketReader, id ulid.ULID, dir string, - indexCache *indexCache, + indexCache *storecache.IndexCache, chunkPool *pool.BytesPool, p partitioner, ) (b *bucketBlock, err error) { @@ -1241,13 +1250,13 @@ type bucketIndexReader struct { block *bucketBlock dec *index.Decoder stats *queryStats - cache *indexCache + cache *storecache.IndexCache mtx sync.Mutex loadedSeries map[uint64][]byte } -func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *indexCache) *bucketIndexReader { +func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketBlock, cache *storecache.IndexCache) *bucketIndexReader { r := &bucketIndexReader{ logger: logger, ctx: ctx, @@ -1415,7 +1424,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { for i, g := range groups { for j, key := range g.keys { // Get postings for the given key from cache first. - if b, ok := r.cache.postings(r.block.meta.ULID, key); ok { + if b, ok := r.cache.Postings(r.block.meta.ULID, key); ok { r.stats.postingsTouched++ r.stats.postingsTouchedSizeSum += len(b) @@ -1487,7 +1496,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { // Return postings and fill LRU cache. groups[p.groupID].Fill(p.keyID, fetchedPostings) - r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) + r.cache.SetPostings(ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ @@ -1510,7 +1519,7 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { var newIDs []uint64 for _, id := range ids { - if b, ok := r.cache.series(r.block.meta.ULID, id); ok { + if b, ok := r.cache.Series(r.block.meta.ULID, id); ok { r.loadedSeries[id] = b continue } @@ -1567,7 +1576,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, } c = c[n : n+int(l)] r.loadedSeries[id] = c - r.cache.setSeries(r.block.meta.ULID, id, c) + r.cache.SetSeries(ctx, r.block.meta.ULID, id, c) } return nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index bc8e55e359e..02ad79781a5 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -15,6 +15,7 @@ import ( "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/objtesting" "github.com/improbable-eng/thanos/pkg/runutil" + storecache "github.com/improbable-eng/thanos/pkg/store/cache" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" @@ -310,7 +311,10 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { t.Log("Run ", i) // Always clean cache before each test. - s.store.indexCache, err = newIndexCache(nil, 100) + s.store.indexCache, err = storecache.NewIndexCache(log.NewNopLogger(), nil, storecache.Opts{ + MaxSizeBytes: 100, + MaxItemSizeBytes: 100, + }) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) diff --git a/pkg/store/cache.go b/pkg/store/cache.go deleted file mode 100644 index 3c391fe3c14..00000000000 --- a/pkg/store/cache.go +++ /dev/null @@ -1,216 +0,0 @@ -package store - -import ( - "sync" - - lru "github.com/hashicorp/golang-lru/simplelru" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/tsdb/labels" -) - -const ( - cacheTypePostings = "postings" - cacheTypeSeries = "series" -) - -type cacheItem struct { - block ulid.ULID - key interface{} -} - -func (c cacheItem) keyType() string { - switch c.key.(type) { - case cacheKeyPostings: - return cacheTypePostings - case cacheKeySeries: - return cacheTypeSeries - } - return "" -} - -type cacheKeyPostings labels.Label -type cacheKeySeries uint64 - -type indexCache struct { - mtx sync.Mutex - lru *lru.LRU - maxSize uint64 - curSize uint64 - - requests *prometheus.CounterVec - hits *prometheus.CounterVec - added *prometheus.CounterVec - current *prometheus.GaugeVec - currentSize *prometheus.GaugeVec - overflow *prometheus.CounterVec -} - -// newIndexCache creates a new LRU cache for index entries and ensures the total cache -// size approximately does not exceed maxBytes. -func newIndexCache(reg prometheus.Registerer, maxBytes uint64) (*indexCache, error) { - c := &indexCache{ - maxSize: maxBytes, - } - evicted := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_evicted_total", - Help: "Total number of items that were evicted from the index cache.", - }, []string{"item_type"}) - evicted.WithLabelValues(cacheTypePostings) - evicted.WithLabelValues(cacheTypeSeries) - - c.added = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_added_total", - Help: "Total number of items that were added to the index cache.", - }, []string{"item_type"}) - c.added.WithLabelValues(cacheTypePostings) - c.added.WithLabelValues(cacheTypeSeries) - - c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_requests_total", - Help: "Total number of requests to the cache.", - }, []string{"item_type"}) - c.requests.WithLabelValues(cacheTypePostings) - c.requests.WithLabelValues(cacheTypeSeries) - - c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_items_overflowed_total", - Help: "Total number of items that could not be added to the cache due to being too big.", - }, []string{"item_type"}) - c.overflow.WithLabelValues(cacheTypePostings) - c.overflow.WithLabelValues(cacheTypeSeries) - - c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_store_index_cache_hits_total", - Help: "Total number of requests to the cache that were a hit.", - }, []string{"item_type"}) - c.hits.WithLabelValues(cacheTypePostings) - c.hits.WithLabelValues(cacheTypeSeries) - - c.current = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_items", - Help: "Current number of items in the index cache.", - }, []string{"item_type"}) - c.current.WithLabelValues(cacheTypePostings) - c.current.WithLabelValues(cacheTypeSeries) - - c.currentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_items_size_bytes", - Help: "Current byte size of items in the index cache.", - }, []string{"item_type"}) - c.currentSize.WithLabelValues(cacheTypePostings) - c.currentSize.WithLabelValues(cacheTypeSeries) - - // Initialize LRU cache with a high size limit since we will manage evictions ourselves - // based on stored size. - onEvict := func(key, val interface{}) { - k := key.(cacheItem).keyType() - v := val.([]byte) - - evicted.WithLabelValues(k).Inc() - c.current.WithLabelValues(k).Dec() - c.currentSize.WithLabelValues(k).Sub(float64(len(v))) - - c.curSize -= uint64(len(v)) - } - l, err := lru.NewLRU(1e12, onEvict) - if err != nil { - return nil, err - } - c.lru = l - - if reg != nil { - reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "thanos_store_index_cache_max_size_bytes", - Help: "Maximum number of bytes to be held in the index cache.", - }, func() float64 { - return float64(maxBytes) - })) - reg.MustRegister(c.requests, c.hits, c.added, evicted, c.current, c.currentSize, c.overflow) - } - return c, nil -} - -// ensureFits tries to make sure that the passed slice will fit into the LRU cache. -// Returns true if it will fit. -func (c *indexCache) ensureFits(b []byte) bool { - if uint64(len(b)) > c.maxSize { - return false - } - for c.curSize > c.maxSize-uint64(len(b)) { - c.lru.RemoveOldest() - } - return true -} - -func (c *indexCache) setPostings(b ulid.ULID, l labels.Label, v []byte) { - c.added.WithLabelValues(cacheTypePostings).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - if !c.ensureFits(v) { - c.overflow.WithLabelValues(cacheTypePostings).Inc() - return - } - - // The caller may be passing in a sub-slice of a huge array. Copy the data - // to ensure we don't waste huge amounts of space for something small. - cv := make([]byte, len(v)) - copy(cv, v) - c.lru.Add(cacheItem{b, cacheKeyPostings(l)}, cv) - - c.currentSize.WithLabelValues(cacheTypePostings).Add(float64(len(v))) - c.current.WithLabelValues(cacheTypePostings).Inc() - c.curSize += uint64(len(v)) -} - -func (c *indexCache) postings(b ulid.ULID, l labels.Label) ([]byte, bool) { - c.requests.WithLabelValues(cacheTypePostings).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - v, ok := c.lru.Get(cacheItem{b, cacheKeyPostings(l)}) - if !ok { - return nil, false - } - c.hits.WithLabelValues(cacheTypePostings).Inc() - return v.([]byte), true -} - -func (c *indexCache) setSeries(b ulid.ULID, id uint64, v []byte) { - c.added.WithLabelValues(cacheTypeSeries).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - if !c.ensureFits(v) { - c.overflow.WithLabelValues(cacheTypeSeries).Inc() - return - } - - // The caller may be passing in a sub-slice of a huge array. Copy the data - // to ensure we don't waste huge amounts of space for something small. - cv := make([]byte, len(v)) - copy(cv, v) - c.lru.Add(cacheItem{b, cacheKeySeries(id)}, cv) - - c.currentSize.WithLabelValues(cacheTypeSeries).Add(float64(len(v))) - c.current.WithLabelValues(cacheTypeSeries).Inc() - c.curSize += uint64(len(v)) -} - -func (c *indexCache) series(b ulid.ULID, id uint64) ([]byte, bool) { - c.requests.WithLabelValues(cacheTypeSeries).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - v, ok := c.lru.Get(cacheItem{b, cacheKeySeries(id)}) - if !ok { - return nil, false - } - c.hits.WithLabelValues(cacheTypeSeries).Inc() - return v.([]byte), true -} diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go new file mode 100644 index 00000000000..965028bc31b --- /dev/null +++ b/pkg/store/cache/cache.go @@ -0,0 +1,335 @@ +package storecache + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + lru "github.com/hashicorp/golang-lru/simplelru" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" +) + +const ( + cacheTypePostings string = "Postings" + cacheTypeSeries string = "Series" +) + +type cacheItem struct { + block ulid.ULID + key interface{} +} + +func (c cacheItem) keyType() string { + switch c.key.(type) { + case cacheKeyPostings: + return cacheTypePostings + case cacheKeySeries: + return cacheTypeSeries + } + return "" +} + +type cacheKeyPostings labels.Label +type cacheKeySeries uint64 + +type IndexCache struct { + mtx sync.Mutex + + logger log.Logger + lru *lru.LRU + maxSizeBytes uint64 + maxItemSizeBytes uint64 + setTimeout time.Duration + + curSize uint64 + + evicted *prometheus.CounterVec + requests *prometheus.CounterVec + hits *prometheus.CounterVec + added *prometheus.CounterVec + current *prometheus.GaugeVec + currentSize *prometheus.GaugeVec + overflow *prometheus.CounterVec + cancelled *prometheus.CounterVec +} + +type Opts struct { + // SetTimeout specifies max time available for SetPosting or SetSeries. + // On timeout "cancelled" metric is incremented and item is omitted from cache. + SetTimeout time.Duration + // MaxSizeBytes represents overall maximum number of bytes cache can contain. + MaxSizeBytes uint64 + // MaxItemSizeBytes represents maximum size of single item. + MaxItemSizeBytes uint64 +} + +// NewIndexCache creates a new thread-safe LRU cache for index entries and ensures the total cache +// size approximately does not exceed maxBytes. +func NewIndexCache(logger log.Logger, reg prometheus.Registerer, opts Opts) (*IndexCache, error) { + if opts.MaxItemSizeBytes > opts.MaxSizeBytes { + return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", opts.MaxItemSizeBytes, opts.MaxSizeBytes) + } + + c := &IndexCache{ + logger: logger, + maxSizeBytes: opts.MaxSizeBytes, + maxItemSizeBytes: opts.MaxItemSizeBytes, + setTimeout: opts.SetTimeout, + } + + c.evicted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_evicted_total", + Help: "Total number of items that were evicted from the index cache.", + }, []string{"item_type"}) + c.evicted.WithLabelValues(cacheTypePostings) + c.evicted.WithLabelValues(cacheTypeSeries) + + c.added = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_added_total", + Help: "Total number of items that were added to the index cache.", + }, []string{"item_type"}) + c.added.WithLabelValues(cacheTypePostings) + c.added.WithLabelValues(cacheTypeSeries) + + c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"item_type"}) + c.requests.WithLabelValues(cacheTypePostings) + c.requests.WithLabelValues(cacheTypeSeries) + + c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_overflowed_total", + Help: "Total number of items that could not be added to the cache due to being too big.", + }, []string{"item_type"}) + c.overflow.WithLabelValues(cacheTypePostings) + c.overflow.WithLabelValues(cacheTypeSeries) + + c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_hits_total", + Help: "Total number of requests to the cache that were a hit.", + }, []string{"item_type"}) + c.hits.WithLabelValues(cacheTypePostings) + c.hits.WithLabelValues(cacheTypeSeries) + + c.current = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items", + Help: "Current number of items in the index cache.", + }, []string{"item_type"}) + c.current.WithLabelValues(cacheTypePostings) + c.current.WithLabelValues(cacheTypeSeries) + + c.currentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items_size_bytes", + Help: "Current byte size of items in the index cache.", + }, []string{"item_type"}) + c.currentSize.WithLabelValues(cacheTypePostings) + c.currentSize.WithLabelValues(cacheTypeSeries) + + c.cancelled = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_set_cancels_total", + Help: "Number of requests that timed out or cancelled before being set to cache. Might indicate slow cache", + }, []string{"item_type"}) + c.cancelled.WithLabelValues(cacheTypePostings) + c.cancelled.WithLabelValues(cacheTypeSeries) + + if reg != nil { + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_max_size_bytes", + Help: "Maximum number of bytes to be held in the index cache.", + }, func() float64 { + return float64(c.maxSizeBytes) + })) + reg.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_max_item_size_bytes", + Help: "Maximum number of bytes for single entry to be held in the index cache.", + }, func() float64 { + return float64(c.maxItemSizeBytes) + })) + reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.overflow, c.cancelled) + } + + if opts.MaxSizeBytes > 1e12 { + return nil, errors.Errorf("max size (%v) cannot overflowing internal LRU hardcoded max size of 1e12", opts.MaxSizeBytes) + } + + // We don't use NewLRU auto eviction logic as it reserves space first then checks for size, that's why + // we do eviction. + l, err := lru.NewLRU(1e12, c.onEvict) + if err != nil { + return nil, err + } + c.lru = l + + level.Info(logger).Log( + "msg", "created index cache", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", c.maxSizeBytes, + "setTimeout", c.setTimeout.String(), + ) + return c, nil +} + +// Initialize LRU cache with a high size limit since we will manage evictions ourselves +// based on stored size. +func (c *IndexCache) onEvict(key, val interface{}) { + k := key.(cacheItem).keyType() + v := val.([]byte) + + c.evicted.WithLabelValues(string(k)).Inc() + c.current.WithLabelValues(string(k)).Dec() + c.currentSize.WithLabelValues(string(k)).Sub(float64(len(v))) + + c.curSize -= uint64(len(v)) +} + +// ensureFits tries to make sure that the passed slice will fit into the LRU cache. +// Returns true if it will fit. +func (c *IndexCache) ensureFits(ctx context.Context, size uint64, typ string) bool { + if size > c.maxItemSizeBytes { + level.Debug(c.logger).Log( + "msg", "item bigger than maxItemSizeBytes. Ignoring..", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", c.maxSizeBytes, + "curSize", c.curSize, + "itemSize", size, + "cacheType", typ, + ) + c.overflow.WithLabelValues(string(typ)).Inc() + return false + } + + for c.curSize+size > c.maxSizeBytes { + if ctx.Err() != nil { + c.cancelled.WithLabelValues(string(typ)).Inc() + return false + } + + _, _, ok := c.lru.RemoveOldest() + if !ok { + level.Error(c.logger).Log( + "msg", "LRU has nothing more to evict, but we still cannot allocate the item. Ignoring item.", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", c.maxSizeBytes, + "curSize", c.curSize, + "itemSize", size, + "cacheType", typ, + ) + c.overflow.WithLabelValues(string(typ)).Inc() + return false + } + } + return true +} + +func (c *IndexCache) SetPostings(ctx context.Context, b ulid.ULID, l labels.Label, v []byte) { + entrySize := uint64(len(v)) + if entrySize == 0 { + level.Debug(c.logger).Log( + "msg", "found empty posting. Ignoring.", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", c.maxSizeBytes, + "curSize", c.curSize, + "id", b.String(), + "lname", l.Name, + "lvalue", l.Value, + ) + return + } + + if c.setTimeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, c.setTimeout) + defer cancel() + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.ensureFits(ctx, entrySize, cacheTypePostings) { + return + } + + // The caller may be passing in a sub-slice of a huge array. Copy the data + // to ensure we don't waste huge amounts of space for something small. + cv := make([]byte, entrySize) + copy(cv, v) + c.lru.Add(cacheItem{b, cacheKeyPostings(l)}, cv) + + c.added.WithLabelValues(cacheTypePostings).Inc() + c.currentSize.WithLabelValues(cacheTypePostings).Add(float64(entrySize)) + c.current.WithLabelValues(cacheTypePostings).Inc() + c.curSize += entrySize +} + +func (c *IndexCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { + c.requests.WithLabelValues(cacheTypePostings).Inc() + + c.mtx.Lock() + defer c.mtx.Unlock() + + v, ok := c.lru.Get(cacheItem{b, cacheKeyPostings(l)}) + if !ok { + return nil, false + } + c.hits.WithLabelValues(cacheTypePostings).Inc() + return v.([]byte), true +} + +func (c *IndexCache) SetSeries(ctx context.Context, b ulid.ULID, id uint64, v []byte) { + entrySize := uint64(len(v)) + if entrySize == 0 { + level.Debug(c.logger).Log( + "msg", "found empty series. Ignoring.", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", c.maxSizeBytes, + "curSize", c.curSize, + "id", b.String(), + "seriesID", id, + ) + return + } + + if c.setTimeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, c.setTimeout) + defer cancel() + } + + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.ensureFits(ctx, entrySize, cacheTypeSeries) { + return + } + + // The caller may be passing in a sub-slice of a huge array. Copy the data + // to ensure we don't waste huge amounts of space for something small. + cv := make([]byte, entrySize) + copy(cv, v) + c.lru.Add(cacheItem{b, cacheKeySeries(id)}, cv) + + c.added.WithLabelValues(cacheTypeSeries).Inc() + c.currentSize.WithLabelValues(cacheTypeSeries).Add(float64(entrySize)) + c.current.WithLabelValues(cacheTypeSeries).Inc() + c.curSize += entrySize +} + +func (c *IndexCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { + c.requests.WithLabelValues(cacheTypeSeries).Inc() + + c.mtx.Lock() + defer c.mtx.Unlock() + + v, ok := c.lru.Get(cacheItem{b, cacheKeySeries(id)}) + if !ok { + return nil, false + } + c.hits.WithLabelValues(cacheTypeSeries).Inc() + return v.([]byte), true +} diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go new file mode 100644 index 00000000000..cd1d191a6d9 --- /dev/null +++ b/pkg/store/cache/cache_test.go @@ -0,0 +1,207 @@ +// Tests out the index cache implementation. +package storecache + +import ( + "context" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/tsdb/labels" +) + +func TestIndexCache_Cancellable(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + metrics := prometheus.NewRegistry() + cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ + MaxItemSizeBytes: 5, + MaxSizeBytes: 5, + }) + testutil.Ok(t, err) + + cache.SetPostings(ctx, ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + + cache.SetPostings(cancelledCtx, ulid.MustNew(0, nil), labels.Label{Name: "test", Value: "124"}, []byte{42, 33}) + cache.SetSeries(cancelledCtx, ulid.MustNew(0, nil),123, []byte{42, 33}) + + testutil.Equals(t, uint64(5), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.cancelled.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.cancelled.WithLabelValues(cacheTypeSeries))) +} + +func TestIndexCache_Eviction_WithMetrics(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + metrics := prometheus.NewRegistry() + cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ + MaxItemSizeBytes: 5, + MaxSizeBytes: 5, + }) + testutil.Ok(t, err) + + id := ulid.MustNew(0, nil) + lbls := labels.Label{Name: "test", Value: "123"} + + _, ok := cache.Postings(id, lbls) + testutil.Assert(t, !ok, "no such key") + + // Add 2 bytes. (2/5) + cache.SetPostings(ctx, id, lbls, []byte{42, 33}) + testutil.Equals(t, uint64(2), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + p, ok := cache.Postings(id, lbls) + testutil.Assert(t, ok, "key exists") + testutil.Equals(t, []byte{42, 33}, p) + + _, ok = cache.Postings(ulid.MustNew(1, nil), lbls) + testutil.Assert(t, !ok, "no such key") + _, ok = cache.Postings(id, labels.Label{Name: "test", Value: "124"}) + testutil.Assert(t, !ok, "no such key") + + // Add 3 more bytes. (5/5) + cache.SetSeries(ctx, id, 1234, []byte{222, 223, 224}) + testutil.Equals(t, uint64(5), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(3), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + p, ok = cache.Series(id, 1234) + testutil.Assert(t, ok, "key exists") + testutil.Equals(t, []byte{222, 223, 224}, p) + + lbls2 := labels.Label{Name: "test", Value: "124"} + + // Add 5 bytes, should evict 2 last items. (10/5) + cache.SetPostings(ctx, id, lbls2, []byte{42, 33, 14, 67, 11}) + + testutil.Equals(t, uint64(5), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) // Eviction. + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) // Eviction. + + // Evicted. + _, ok = cache.Postings(id, lbls) + testutil.Assert(t, !ok, "no such key") + _, ok = cache.Series(id, 1234) + testutil.Assert(t, !ok, "no such key") + + p, ok = cache.Postings(id, lbls2) + testutil.Assert(t, ok, "key exists") + testutil.Equals(t, []byte{42, 33, 14, 67, 11}, p) + + // Add same item again. + // NOTE: In our caller code, we always check first hit, then we claim miss and set posting so this should not happen. + // That's why this case is not optimized and we evict + re add the item. + cache.SetPostings(ctx, id, lbls2, []byte{42, 33, 14, 67, 11}) + + testutil.Equals(t, uint64(5), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) // Eviction. + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + p, ok = cache.Postings(id, lbls2) + testutil.Assert(t, ok, "key exists") + testutil.Equals(t, []byte{42, 33, 14, 67, 11}, p) + + // Add too big item. + cache.SetPostings(ctx, id, labels.Label{Name: "test", Value: "toobig"}, []byte{5, 42, 33, 14, 67, 11}) + testutil.Equals(t, uint64(5), cache.curSize) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(5), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) // Overflow. + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + _, _, ok = cache.lru.RemoveOldest() + testutil.Assert(t, ok, "something to remove") + + testutil.Equals(t, uint64(0), cache.curSize) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + _, _, ok = cache.lru.RemoveOldest() + testutil.Assert(t, !ok, "nothing to remove") + + lbls3 := labels.Label{Name: "test", Value: "124"} + + // Add empty value. It should not be added to cache. + cache.SetPostings(ctx, id, lbls3, []byte{}) + + testutil.Equals(t, uint64(0), cache.curSize) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.current.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.currentSize.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) + + // Other metrics. + testutil.Equals(t, float64(3), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(7), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(3), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypeSeries))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.cancelled.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(0), promtest.ToFloat64(cache.cancelled.WithLabelValues(cacheTypeSeries))) +} diff --git a/pkg/store/cache_test.go b/pkg/store/cache_test.go deleted file mode 100644 index b33931fd7a3..00000000000 --- a/pkg/store/cache_test.go +++ /dev/null @@ -1,89 +0,0 @@ -// Tests out the index cache implementation. -package store - -import ( - "testing" - "time" - - "github.com/fortytw2/leaktest" - "github.com/improbable-eng/thanos/pkg/testutil" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/tsdb/labels" -) - -// TestIndexCacheEdge tests the index cache edge cases. -func TestIndexCacheEdge(t *testing.T) { - metrics := prometheus.NewRegistry() - cache, err := newIndexCache(metrics, 1) - testutil.Ok(t, err) - - fits := cache.ensureFits([]byte{42, 24}) - testutil.Equals(t, fits, false) - - fits = cache.ensureFits([]byte{42}) - testutil.Equals(t, fits, true) - - fits = cache.ensureFits([]byte{}) - testutil.Equals(t, fits, true) - - fits = cache.ensureFits([]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) - testutil.Equals(t, fits, false) - - metrics = prometheus.NewRegistry() - cache, err = newIndexCache(metrics, 0) - testutil.Ok(t, err) - - fits = cache.ensureFits([]byte{42, 24}) - testutil.Equals(t, fits, false) - - fits = cache.ensureFits([]byte{42}) - testutil.Equals(t, fits, false) - - fits = cache.ensureFits([]byte{}) - testutil.Equals(t, fits, true) - - fits = cache.ensureFits([]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) - testutil.Equals(t, fits, false) -} - -// TestIndexCacheSmoke runs the smoke tests for the index cache. -func TestIndexCacheSmoke(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() - - metrics := prometheus.NewRegistry() - cache, err := newIndexCache(metrics, 20) - testutil.Ok(t, err) - - blid := ulid.ULID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) - labels := labels.Label{Name: "test", Value: "123"} - - cache.setPostings(blid, labels, []byte{42}) - - p, ok := cache.postings(blid, labels) - testutil.Equals(t, ok, true) - testutil.Equals(t, p, []byte{42}) - testutil.Equals(t, cache.curSize, uint64(1)) - - cache.setSeries(blid, 1234, []byte{42, 42}) - - s, ok := cache.series(blid, 1234) - testutil.Equals(t, ok, true) - testutil.Equals(t, s, []byte{42, 42}) - testutil.Equals(t, cache.curSize, uint64(3)) - - cache.lru.RemoveOldest() - testutil.Equals(t, cache.curSize, uint64(2)) - - cache.lru.RemoveOldest() - testutil.Equals(t, cache.curSize, uint64(0)) - - cache.lru.RemoveOldest() - testutil.Equals(t, cache.curSize, uint64(0)) - - cache.setSeries(blid, 1234, []byte{1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1}) - cache.setSeries(blid, 1237, []byte{1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1}) - cache.setSeries(blid, 1235, []byte{1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 5}) - testutil.Equals(t, cache.curSize, uint64(20)) - -}