Skip to content

Commit

Permalink
feat: add expiration support to index backed blockstore (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Feb 9, 2023
1 parent 1e1e854 commit 8c23211
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
Expand All @@ -20,6 +19,7 @@ require (
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car/v2 v2.4.1
github.com/jellydator/ttlcache/v2 v2.11.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multicodec v0.5.0
github.com/multiformats/go-multihash v0.2.1
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64=
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -1108,6 +1110,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
Expand Down Expand Up @@ -1188,6 +1191,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
Expand Down Expand Up @@ -1440,6 +1444,7 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
Expand Down
35 changes: 18 additions & 17 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/jellydator/ttlcache/v2"
)

var logbs = logging.Logger("dagstore/idxbs")
Expand Down Expand Up @@ -48,30 +49,29 @@ type IndexBackedBlockstore struct {

// caches the blockstore for a given shard for shard read affinity
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
blockstoreCache *lru.Cache
blockstoreCache *ttlcache.Cache
// used to manage concurrent acquisition of shards by multiple threads
stripedLock [256]sync.Mutex
}

func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) {
cache := ttlcache.NewCache()
cache.SetTTL(cacheExpire)
cache.SetCacheSizeLimit(maxCacheSize)
cache.SetExpirationReasonCallback(func(_ string, _ ttlcache.EvictionReason, val interface{}) {
// Ensure we close the blockstore for a shard when it's evicted from
// the cache so dagstore can gc it.
// TODO: add reference counting mechanism so that the blockstore does
// not get closed while there is an operation still in progress against it
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
}

return &IndexBackedBlockstore{
ctx: ctx,
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
blockstoreCache: cache,
}, nil
}

Expand Down Expand Up @@ -140,16 +140,17 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
// If so, call op on the cached blockstore.
for _, sk := range shards {
// Get the shard's blockstore from the cache
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err != nil {
continue
}

if val != nil {
accessor := val.(*accessorWithBlockstore)
res, err := execOpOnBlockstore(ctx, c, sk, accessor.bs, op)
if err != nil {
return nil, err
}

// Found a cached blockstore containing the required block,
// and successfully called the blockstore op
return res, nil
}
}
Expand Down Expand Up @@ -181,8 +182,8 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block

// Check if the blockstore was created by another thread while this
// thread was waiting to enter the lock
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err == nil && val != nil {
return val.(*accessorWithBlockstore).bs, nil
}

Expand All @@ -208,7 +209,7 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
}

// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
ro.blockstoreCache.Set(sk.String(), &accessorWithBlockstore{sa, bs})

logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)

Expand Down
10 changes: 5 additions & 5 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
res := <-ch
require.NoError(t, res.Error)

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, rejectedErr
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand All @@ -137,7 +137,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, ErrNoShardSelected
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
require.NoError(t, err)

rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// Has should return false
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {
sks = append(sks, sk)
}

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3, time.Minute)
require.NoError(t, err)

var errg errgroup.Group
Expand Down

0 comments on commit 8c23211

Please sign in to comment.