Skip to content

Commit

Permalink
Merge pull request #147 from filecoin-project/fix/idx-backed-bstore
Browse files Browse the repository at this point in the history
fix index backed blockstore panic by simplifying locking
  • Loading branch information
dirkmc authored Jan 27, 2023
2 parents f9e7b7b + d4ca560 commit ac9cc42
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 50 deletions.
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ go 1.16
require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.4.0 // indirect
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-blockservice v0.4.0
github.com/ipfs/go-cid v0.2.0
github.com/ipfs/go-cidutil v0.1.0
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-files v0.0.3
github.com/ipfs/go-ipld-format v0.3.0
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipfs/go-merkledag v0.6.0
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car/v2 v2.4.1
github.com/libp2p/go-libp2p-core v0.9.0 // indirect
github.com/mr-tron/base58 v1.2.0
Expand Down
10 changes: 9 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a h1:E/8AP5dFtMhl5KPJz66Kt9G0n+7Sn41Fy1wv9/jHOrc=
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -278,8 +279,11 @@ github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-cid v0.1.0 h1:YN33LQulcRHjfom/i25yoOZR4Telp1Hr/2RU3d0PnC0=
github.com/ipfs/go-cid v0.1.0/go.mod h1:rH5/Xv83Rfy8Rw6xG+id3DYAMUVmem1MowoKwdXmN2o=
github.com/ipfs/go-cid v0.2.0 h1:01JTiihFq9en9Vz0lc0VDWvZe/uBonGpzo4THP0vcQ0=
github.com/ipfs/go-cid v0.2.0/go.mod h1:P+HXFDF4CVhaVayiEb4wkAy7zBHxBwsJyt0Y5U6MLro=
github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q=
github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
Expand Down Expand Up @@ -320,7 +324,9 @@ github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT
github.com/ipfs/go-ipfs-exchange-offline v0.2.0/go.mod h1:HjwBeW0dvZvfOMwDP0TSKXIHf2s+ksdP4E3MLDRtLKY=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s=
github.com/ipfs/go-ipfs-files v0.0.3 h1:ME+QnC3uOyla1ciRPezDW0ynQYK2ikOh9OCKAEg4uUA=
github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
Expand Down Expand Up @@ -357,6 +363,7 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-unixfs v0.3.1 h1:LrfED0OGfG98ZEegO4/xiprx2O+yS+krCMQSp7zLVv8=
github.com/ipfs/go-unixfs v0.3.1/go.mod h1:h4qfQYzghiIc8ZNFKiLMFWOTzrWIAtzYQ59W/pCFf1o=
github.com/ipfs/go-unixfsnode v1.4.0 h1:9BUxHBXrbNi8mWHc6j+5C580WJqtVw9uoeEKn4tMhwA=
github.com/ipfs/go-unixfsnode v1.4.0/go.mod h1:qc7YFFZ8tABc58p62HnIYbUMwj9chhUuFWmxSokfePo=
Expand Down Expand Up @@ -698,6 +705,7 @@ github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPw
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multicodec v0.3.1-0.20211210143421-a526f306ed2c/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multicodec v0.4.1/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multicodec v0.5.0 h1:EgU6cBe/D7WRwQb1KmnBvU7lrcFGMggZVTPtOW9dDHs=
github.com/multiformats/go-multicodec v0.5.0/go.mod h1:DiY2HFaEp5EhEXb/iYzVAunmyX/aSFMxq2KMKfWEues=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
Expand Down
84 changes: 41 additions & 43 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type IndexBackedBlockstore struct {
// i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
blockstoreCache *lru.Cache
// used to manage concurrent acquisition of shards by multiple threads
bsAcquireByShard sync.Map
stripedLock [256]sync.Mutex
}

func NewIndexBackedBlockstore(ctx context.Context, d dagstore.Interface, shardSelector ShardSelectorF, maxCacheSize int) (blockstore.Blockstore, error) {
Expand Down Expand Up @@ -169,59 +169,57 @@ func (ro *IndexBackedBlockstore) execOp(ctx context.Context, c cid.Cid, op Block

// Some retrieval patterns will result in multiple threads fetching blocks
// from the same piece concurrently. In that case many threads may attempt
// to create a blockstore over the same piece. Use a sync.Once to ensure
// to create a blockstore over the same piece. Use a striped lock to ensure
// that the blockstore is only created once for all threads waiting on the
// same shard.
bsAcquireI, _ := ro.bsAcquireByShard.LoadOrStore(sk, &blockstoreAcquire{})
bsAcquire := bsAcquireI.(*blockstoreAcquire)
bsAcquire.once.Do(func() {
bsAcquire.bs, bsAcquire.err = func() (dagstore.ReadBlockstore, error) {
// Check if the blockstore was created by another thread while this
// thread was waiting to enter the sync.Once
val, ok := ro.blockstoreCache.Get(sk)
if ok {
return val.(*accessorWithBlockstore).bs, nil
}

// Acquire the blockstore for the selected shard
resch := make(chan dagstore.ShardResult, 1)
if err := ro.d.AcquireShard(ro.ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
}
var shres dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case shres = <-resch:
if shres.Error != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, shres.Error)
}
}
bs, err := func() (dagstore.ReadBlockstore, error) {
// Derive the striped lock index from the shard key and acquire the lock
skstr := sk.String()
lockIdx := skstr[len(skstr)-1]
ro.stripedLock[lockIdx].Lock()
defer ro.stripedLock[lockIdx].Unlock()

// Check if the blockstore was created by another thread while this
// thread was waiting to enter the lock
val, ok := ro.blockstoreCache.Get(sk)
if ok {
return val.(*accessorWithBlockstore).bs, nil
}

sa := shres.Accessor
bs, err := sa.Blockstore()
if err != nil {
return nil, fmt.Errorf("failed to load read-only blockstore for shard %s: %w", sk, err)
// Acquire the blockstore for the selected shard
resch := make(chan dagstore.ShardResult, 1)
if err := ro.d.AcquireShard(ro.ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
}
var shres dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case shres = <-resch:
if shres.Error != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, shres.Error)
}
}

// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
sa := shres.Accessor
bs, err := sa.Blockstore()
if err != nil {
return nil, fmt.Errorf("failed to load read-only blockstore for shard %s: %w", sk, err)
}

logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)
// Add the blockstore to the cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})

return bs, nil
}()
logbs.Debugw("Added new blockstore to cache", "cid", c, "shard", sk)

// The sync.Once has completed so clean up the acquire entry for this shard
ro.bsAcquireByShard.Delete(sk)
})

if bsAcquire.err != nil {
return nil, bsAcquire.err
return bs, nil
}()
if err != nil {
return nil, err
}

// Call the operation on the blockstore
return execOpOnBlockstore(ctx, c, sk, bsAcquire.bs, op)
return execOpOnBlockstore(ctx, c, sk, bs, op)
}

func execOpOnBlockstore(ctx context.Context, c cid.Cid, sk shard.Key, bs dagstore.ReadBlockstore, op BlockstoreOp) (*opRes, error) {
Expand Down
20 changes: 16 additions & 4 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -184,6 +186,7 @@ func TestIndexBackedBlockstore(t *testing.T) {

func TestIndexBackedBlockstoreFuzz(t *testing.T) {
ctx := context.Background()
tempdir := t.TempDir()
store := dssync.MutexWrap(datastore.NewMapDatastore())
dagst, err := dagstore.NewDAGStore(dagstore.Config{
MountRegistry: testRegistry(t),
Expand All @@ -197,17 +200,26 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {

// register some shards
var sks []shard.Key
for i := 0; i < 3; i++ {
for i := 0; i < 10; i++ {
ch := make(chan dagstore.ShardResult, 1)
sk := shard.KeyFromString(fmt.Sprintf("test%d", i))
err = dagst.RegisterShard(context.Background(), sk, carv2mnt, ch, dagstore.RegisterOpts{})

rseed := time.Now().Nanosecond()
randomFilepath, err := testdata.CreateRandomFile(tempdir, rseed, 256*1024)
require.NoError(t, err)
_, carFilepath, err := testdata.CreateDenseCARv2(tempdir, randomFilepath)
require.NoError(t, err)
carBytes, err := ioutil.ReadFile(carFilepath)
require.NoError(t, err)
mnt := &mount.BytesMount{Bytes: carBytes}
err = dagst.RegisterShard(context.Background(), sk, mnt, ch, dagstore.RegisterOpts{})
require.NoError(t, err)
res := <-ch
require.NoError(t, res.Error)
sks = append(sks, sk)
}

rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 10)
rbs, err := NewIndexBackedBlockstore(ctx, dagst, noOpSelector, 3)
require.NoError(t, err)

var errg errgroup.Group
Expand All @@ -219,7 +231,7 @@ func TestIndexBackedBlockstoreFuzz(t *testing.T) {
return err
}

for i := 0; i < 3; i++ {
for i := 0; i < 10; i++ {
var skerrg errgroup.Group
it.ForEach(func(mh multihash.Multihash, _ uint64) error {
mhs := mh
Expand Down
Loading

0 comments on commit ac9cc42

Please sign in to comment.