Skip to content

Commit

Permalink
Merge pull request #153 from filecoin-project/fix/idx-bstore-idx-cids
Browse files Browse the repository at this point in the history
change index-backed blockstore dagstore interface
  • Loading branch information
dirkmc authored Mar 7, 2023
2 parents d36a569 + 65fd5c9 commit 1de8e01
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
25 changes: 17 additions & 8 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@ type accessorWithBlockstore struct {
bs dagstore.ReadBlockstore
}

type blockstoreAcquire struct {
once sync.Once
bs dagstore.ReadBlockstore
err error
type IdxBstoreDagstore interface {
ShardsContainingCid(ctx context.Context, c cid.Cid) ([]shard.Key, error)
AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error
}

// IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore.
type IndexBackedBlockstore struct {
ctx context.Context
d dagstore.Interface
d IdxBstoreDagstore
shardSelectF ShardSelectorF

// caches the blockstore for a given shard for shard read affinity
Expand All @@ -54,7 +53,7 @@ type IndexBackedBlockstore struct {
stripedLock [256]sync.Mutex
}

func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) {
func NewIndexBackedBlockstore(ctx context.Context, d IdxBstoreDagstore, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) {
cache := ttlcache.NewCache()
cache.SetTTL(cacheExpire)
cache.SetCacheSizeLimit(maxCacheSize)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (ro *IndexBackedBlockstore) execOpWithLogs(ctx context.Context, c cid.Cid,

func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op BlockstoreOp) (*opRes, error) {
// Fetch all the shards containing the multihash
shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash())
shards, err := ro.d.ShardsContainingCid(ctx, c)
if err != nil {
if errors.Is(err, datastore.ErrNotFound) {
return nil, ErrBlockNotFound
Expand Down Expand Up @@ -242,7 +241,7 @@ func (ro *IndexBackedBlockstore) Has(ctx context.Context, c cid.Cid) (bool, erro
logbs.Debugw("Has", "cid", c)

// Get shards that contain the cid's hash
shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash())
shards, err := ro.d.ShardsContainingCid(ctx, c)
if err != nil {
logbs.Debugw("Has error", "cid", c, "err", err)
return false, nil
Expand Down Expand Up @@ -283,3 +282,13 @@ func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error
func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}

type IdxBstoreDagstoreFromDagstore struct {
dagstore.Interface
}

var _ IdxBstoreDagstore = (*IdxBstoreDagstoreFromDagstore)(nil)

func (d *IdxBstoreDagstoreFromDagstore) ShardsContainingCid(ctx context.Context, c cid.Cid) ([]shard.Key, error) {
return d.Interface.ShardsContainingMultihash(ctx, c.Hash())
}
12 changes: 7 additions & 5 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func TestIndexBackedBlockstore(t *testing.T) {
res := <-ch
require.NoError(t, res.Error)

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
ibsapi := &IdxBstoreDagstoreFromDagstore{Interface: dagst}
rbs, err := NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, rejectedErr
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand All @@ -137,7 +138,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, ErrNoShardSelected
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand Down Expand Up @@ -165,7 +166,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
require.NoError(t, err)

rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
rbs, err = NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// Has should return false
Expand Down Expand Up @@ -219,7 +220,8 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {
sks = append(sks, sk)
}

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3, time.Minute)
ibsapi := &IdxBstoreDagstoreFromDagstore{Interface: dagst}
rbs, err := NewIndexBackedBlockstore(ctx, ibsapi, noOpSelector, 3, time.Minute)
require.NoError(t, err)

var errg errgroup.Group
Expand Down

0 comments on commit 1de8e01

Please sign in to comment.