Skip to content

Commit

Permalink
feat(eds/store): add eds blockstore cache metrics (celestiaorg#2567)
Browse files Browse the repository at this point in the history
Add metrics allowing to see blockstore cache size and eviction pressure

(cherry picked from commit 323f603)
  • Loading branch information
walldiss committed Sep 22, 2023
1 parent e973b4a commit eab309a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 6 deletions.
63 changes: 57 additions & 6 deletions share/eds/accessor_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eds

import (
"context"
"errors"
"fmt"
"reflect"
Expand All @@ -9,6 +10,8 @@ import (
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
Expand All @@ -32,11 +35,23 @@ type blockstoreCache struct {
// caches the blockstore for a given shard for shard read affinity i.e.
// further reads will likely be from the same shard. Maps (shard key -> blockstore).
cache *lru.Cache

metrics *cacheMetrics
}

func newBlockstoreCache(cacheSize int) (*blockstoreCache, error) {
bc := &blockstoreCache{}
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(cacheSize, func(_ interface{}, val interface{}) {
bslru, err := lru.NewWithEvict(cacheSize, bc.evictFn())
if err != nil {
return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err)
}
bc.cache = bslru
return bc, nil
}

func (bc *blockstoreCache) evictFn() func(_ interface{}, val interface{}) {
return func(_ interface{}, val interface{}) {
// ensure we close the blockstore for a shard when it's evicted so dagstore can gc it.
abs, ok := val.(*accessorWithBlockstore)
if !ok {
Expand All @@ -46,14 +61,12 @@ func newBlockstoreCache(cacheSize int) (*blockstoreCache, error) {
))
}

if err := abs.sa.Close(); err != nil {
err := abs.sa.Close()
if err != nil {
log.Errorf("couldn't close accessor after cache eviction: %s", err)
}
})
if err != nil {
return nil, fmt.Errorf("failed to instantiate blockstore cache: %w", err)
bc.metrics.observeEvicted(err != nil)
}
return &blockstoreCache{cache: bslru}, nil
}

// Get retrieves the blockstore for a given shard key from the cache. If the blockstore is not in
Expand Down Expand Up @@ -117,3 +130,41 @@ func (bc *blockstoreCache) unsafeAdd(
func shardKeyToStriped(sk shard.Key) byte {
return sk.String()[len(sk.String())-1]
}

type cacheMetrics struct {
evictedCounter metric.Int64Counter
}

func (bc *blockstoreCache) withMetrics() error {
evictedCounter, err := meter.Int64Counter("eds_blockstore_cache_evicted_counter",
metric.WithDescription("eds blockstore cache evicted event counter"))
if err != nil {
return err
}

cacheSize, err := meter.Int64ObservableGauge("eds_blockstore_cache_size",
metric.WithDescription("total amount of items in blockstore cache"),
)
if err != nil {
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
observer.ObserveInt64(cacheSize, int64(bc.cache.Len()))
return nil
}
_, err = meter.RegisterCallback(callback, cacheSize)
if err != nil {
return err
}
bc.metrics = &cacheMetrics{evictedCounter: evictedCounter}
return nil
}

func (m *cacheMetrics) observeEvicted(failed bool) {
if m == nil {
return
}
m.evictedCounter.Add(context.Background(), 1, metric.WithAttributes(
attribute.Bool(failedKey, failed)))
}
3 changes: 3 additions & 0 deletions share/eds/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func (s *Store) WithMetrics() error {
return err
}

if err = s.cache.withMetrics(); err != nil {
return err
}
s.metrics = &metrics{
putTime: putTime,
getCARTime: getCARTime,
Expand Down

0 comments on commit eab309a

Please sign in to comment.