Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(share/eds): skip concurrent writes on edsstore Put #2613

Merged
merged 9 commits into from
Sep 6, 2023
Merged
8 changes: 8 additions & 0 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,6 +65,9 @@ type Store struct {
// lastGCResult is only stored on the store for testing purposes.
lastGCResult atomic.Pointer[dagstore.GCResult]

// stripedLocks is used to synchronize parallel operations
stripedLocks [256]sync.Mutex

metrics *metrics
}

Expand Down Expand Up @@ -197,6 +201,10 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
}

func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) {
lk := &s.stripedLocks[root[len(root)-1]]
lk.Lock()
defer lk.Unlock()

Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// if root already exists, short-circuit
if has, _ := s.Has(ctx, root); has {
return dagstore.ErrShardExists
Expand Down
26 changes: 26 additions & 0 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package eds
import (
"context"
"os"
"sync"
"testing"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -171,6 +173,30 @@ func TestEDSStore(t *testing.T) {
assert.Contains(t, hashesOut, hash)
}
})

t.Run("Parallel put", func(t *testing.T) {
const amount = 20
eds, dah := randomEDS(t)

wg := sync.WaitGroup{}
for i := 1; i < amount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := edsStore.Put(ctx, dah.Hash(), eds)
if err != nil {
require.ErrorIs(t, err, dagstore.ErrShardExists)
}
}()
}
wg.Wait()

eds, err := edsStore.Get(ctx, dah.Hash())
require.NoError(t, err)
newDah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(t, err)
require.Equal(t, dah.Hash(), newDah.Hash())
})
}

// TestEDSStore_GC verifies that unused transient shards are collected by the GC periodically.
Expand Down
Loading