Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add expiration support to index backed blockstore #150

Merged
merged 4 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.2.0
Expand All @@ -20,6 +19,7 @@ require (
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car/v2 v2.4.1
github.com/jellydator/ttlcache/v2 v2.11.1
github.com/libp2p/go-libp2p-core v0.9.0 // indirect
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multicodec v0.5.0
Expand Down
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZl
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHItqWZl6U64=
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -952,6 +954,8 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -1008,8 +1012,9 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -1160,6 +1165,7 @@ golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210112230658-8b4aab62c064/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
35 changes: 18 additions & 17 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/jellydator/ttlcache/v2"
)

var logbs = logging.Logger("dagstore/idxbs")
Expand Down Expand Up @@ -48,30 +49,29 @@ type IndexBackedBlockstore struct {

// caches the blockstore for a given shard for shard read affinity
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
blockstoreCache *lru.Cache
blockstoreCache *ttlcache.Cache
// used to manage concurrent acquisition of shards by multiple threads
stripedLock [256]sync.Mutex
}

func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int, cacheExpire time.Duration) (blockstore.Blockstore, error) {
cache := ttlcache.NewCache()
cache.SetTTL(cacheExpire)
cache.SetCacheSizeLimit(maxCacheSize)
cache.SetExpirationReasonCallback(func(_ string, _ ttlcache.EvictionReason, val interface{}) {
// Ensure we close the blockstore for a shard when it's evicted from
// the cache so dagstore can gc it.
// TODO: add reference counting mechanism so that the blockstore does
// not get closed while there is an operation still in progress against it
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
}

return &IndexBackedBlockstore{
ctx: ctx,
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
blockstoreCache: cache,
}, nil
}

Expand Down Expand Up @@ -140,16 +140,17 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
// If so, call op on the cached blockstore.
for _, sk := range shards {
// Get the shard's blockstore from the cache
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err != nil {
continue
}

if val != nil {
accessor := val.(*accessorWithBlockstore)
res, err := execOpOnBlockstore(ctx, c, sk, accessor.bs, op)
if err != nil {
return nil, err
}

// Found a cached blockstore containing the required block,
// and successfully called the blockstore op
return res, nil
}
}
Expand Down Expand Up @@ -181,8 +182,8 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block

// Check if the blockstore was created by another thread while this
// thread was waiting to enter the lock
val, ok := ro.blockstoreCache.Get(sk)
if ok {
val, err := ro.blockstoreCache.Get(sk.String())
if err == nil && val != nil {
return val.(*accessorWithBlockstore).bs, nil
}

Expand All @@ -208,7 +209,7 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block
}

// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
ro.blockstoreCache.Set(sk.String(), &accessorWithBlockstore{sa, bs})

logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)

Expand Down
10 changes: 5 additions & 5 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
res := <-ch
require.NoError(t, res.Error)

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, rejectedErr
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand All @@ -137,7 +137,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
return shard.Key{}, ErrNoShardSelected
}

rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, fss, 10, time.Minute)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestIndexBackedBlockstore(t *testing.T) {
notFoundCid, err := cid.Parse("bafzbeigai3eoy2ccc7ybwjfz5r3rdxqrinwi4rwytly24tdbh6yk7zslrm")
require.NoError(t, err)

rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err = NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10, time.Minute)
require.NoError(t, err)

// Has should return false
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {
sks = append(sks, sk)
}

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3, time.Minute)
require.NoError(t, err)

var errg errgroup.Group
Expand Down