Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref-count blockstore acquires so as to close exactly once #142

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 112 additions & 20 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
Expand All @@ -28,36 +29,26 @@ var ErrNoShardSelected = errors.New("no shard selected")
// It should return `ErrNoShardSelected` if none of the given shard is selected.
type ShardSelectorF func(c cid.Cid, shards []shard.Key) (shard.Key, error)

type accessorWithBlockstore struct {
sa *dagstore.ShardAccessor
bs dagstore.ReadBlockstore
}

// IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore.
type IndexBackedBlockstore struct {
d dagstore.Interface
shardSelectF ShardSelectorF

// caches the blockstore for a given shard for shard read affinity
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
blockstoreCache *lru.Cache
bsCache *blockstoreCache
}

func NewIndexBackedBlockstore(d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
// ensure we close the blockstore for a shard when it's evicted from the cache so dagstore can gc it.
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
cache, err := newBlockstoreCache(maxCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
return nil, err
}

return &IndexBackedBlockstore{
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
d: d,
shardSelectF: shardSelector,
bsCache: cache,
}, nil
}

Expand Down Expand Up @@ -126,10 +117,10 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
// If so, call op on the cached blockstore.
for _, sk := range shards {
// Get the shard's blockstore from the cache
val, ok := ro.blockstoreCache.Get(sk)
abs, ok := ro.bsCache.Get(sk)
if ok {
accessor := val.(*accessorWithBlockstore)
res, err := execOpOnBlockstore(ctx, c, sk, accessor.bs, op)
res, err := execOpOnBlockstore(ctx, c, sk, abs.bs, op)
abs.close()
if err == nil {
// Found a cached shard blockstore containing the required block,
// and successfully called the blockstore op
Expand Down Expand Up @@ -180,7 +171,9 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
}

// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
abs := &accessorWithBlockstore{sa: sa, bs: bs}
ro.bsCache.Add(sk, abs)
defer abs.close()

logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)

Expand Down Expand Up @@ -248,3 +241,102 @@ func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error
func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}

type blockstoreCache struct {
lk sync.Mutex
cache *lru.Cache
}

func newBlockstoreCache(size int) (*blockstoreCache, error) {
bslru, err := lru.NewWithEvict(size, func(_ interface{}, val interface{}) {
abs := val.(*accessorWithBlockstore)
abs.evict()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores: %w", err)
}

return &blockstoreCache{cache: bslru}, nil
}

func (bc *blockstoreCache) Get(sk shard.Key) (*accessorWithBlockstore, bool) {
bc.lk.Lock()
defer bc.lk.Unlock()

// Get the accessor from the cache
absi, ok := bc.cache.Get(sk)
if !ok {
return nil, false
}

// Increment the accessor's ref count so that the blockstore
// will not be closed until the caller is finished with it
abs := absi.(*accessorWithBlockstore)
abs.incRefCount()
return abs, true
}

func (bc *blockstoreCache) Add(sk shard.Key, abs *accessorWithBlockstore) {
bc.lk.Lock()
defer bc.lk.Unlock()

// Check if we're replacing an existing accessor with this Add
absi, ok := bc.cache.Get(sk)
if ok {
// Mark the existing accessor as evicted so that its blockstore can be
// closed once all callers are done with the blockstore
abs := absi.(*accessorWithBlockstore)
abs.evict()
}

// Add the new accessor
bc.cache.Add(sk, abs)
abs.incRefCount()
}

type accessorWithBlockstore struct {
sa *dagstore.ShardAccessor
bs dagstore.ReadBlockstore

lk sync.Mutex
evicted bool
refCount int
}

func (abs *accessorWithBlockstore) incRefCount() {
abs.lk.Lock()
defer abs.lk.Unlock()

abs.refCount++
}

func (abs *accessorWithBlockstore) close() {
abs.lk.Lock()
defer abs.lk.Unlock()

abs.refCount--
if abs.refCount == 0 && abs.evicted {
// The blockstore has already been evicted, and this was the last
// reference to it, so close the blockstore so that dagstore can GC it
err := abs.sa.Close()
if err != nil {
logbs.Warnf("error closing blockstore: %w", err)
}
}
}

func (abs *accessorWithBlockstore) evict() {
abs.lk.Lock()
defer abs.lk.Unlock()

abs.evicted = true

if abs.refCount == 0 {
// There are no more references to the blockstore; close it so that the
// dagstore can GC it
err := abs.sa.Close()
if err != nil {
logbs.Warnf("error closing blockstore: %w", err)
}
}
}