Skip to content

Commit

Permalink
feat: add expiration support to index backed blockstore (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun authored Feb 9, 2023
1 parent ac9cc42 commit 6d71d48
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.2.0
Expand All @@ -20,6 +19,7 @@ require (
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car/v2 v2.4.1
github.com/jellydator/ttlcache/v2 v2.11.1
github.com/libp2p/go-libp2p-core v0.9.0 // indirect
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multicodec v0.5.0
Expand Down
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64=
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -952,6 +954,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -1008,8 +1012,9 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -1160,6 +1165,7 @@ golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
35 changes: 18 additions & 17 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/jellydator/ttlcache/v2"
)

var logbs = logging.Logger("dagstore/idxbs")
Expand Down Expand Up @@ -48,30 +49,29 @@ type IndexBackedBlockstore struct {

// caches the blockstore for a given shard for shard read affinity
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
blockstoreCache *lru.Cache
blockstoreCache *ttlcache.Cache
// used to manage concurrent acquisition of shards by multiple threads
stripedLock [256]sync.Mutex
}

func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) {
cache := ttlcache.NewCache()
cache.SetTTL(cacheExpire)
cache.SetCacheSizeLimit(maxCacheSize)
cache.SetExpirationReasonCallback(func(_ string, _ ttlcache.EvictionReason, val interface{}) {
// Ensure we close the blockstore for a shard when it's evicted from
// the cache so dagstore can gc it.
// TODO: add reference counting mechanism so that the blockstore does
// not get closed while there is an operation still in progress against it
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
}

return &IndexBackedBlockstore{
ctx: ctx,
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
blockstoreCache: cache,
}, nil
}

Expand Down Expand Up @@ -140,16 +140,17 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
// If so, call op on the cached blockstore.
for _, sk := range shards {
// Get the shard's blockstore from the cache
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err != nil {
continue
}

if val != nil {
accessor := val.(*accessorWithBlockstore)
res, err := execOpOnBlockstore(ctx, c, sk, accessor.bs, op)
if err != nil {
return nil, err
}

// Found a cached blockstore containing the required block,
// and successfully called the blockstore op
return res, nil
}
}
Expand Down Expand Up @@ -181,8 +182,8 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block

// Check if the blockstore was created by another thread while this
// thread was waiting to enter the lock
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err == nil && val != nil {
return val.(*accessorWithBlockstore).bs, nil
}

Expand All @@ -208,7 +209,7 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
}

// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
ro.blockstoreCache.Set(sk.String(), &accessorWithBlockstore{sa, bs})

logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)

Expand Down
10 changes: 5 additions & 5 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
res := <-ch
require.NoError(t, res.Error)

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, rejectedErr
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand All @@ -137,7 +137,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, ErrNoShardSelected
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
require.NoError(t, err)

rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// Has should return false
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {
sks = append(sks, sk)
}

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3, time.Minute)
require.NoError(t, err)

var errg errgroup.Group
Expand Down

0 comments on commit 6d71d48

Please sign in to comment.