diff --git a/share/eds/store.go b/share/eds/store.go index fa9a7f7c7e..19cbd83057 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "sync" "sync/atomic" "time" @@ -63,6 +64,8 @@ type Store struct { // lastGCResult is only stored on the store for testing purposes. lastGCResult atomic.Pointer[dagstore.GCResult] + // stripedLocks is used to synchronize parallel operations + stripedLocks [256]sync.Mutex shardFailures chan dagstore.ShardResult metrics *metrics @@ -221,6 +224,10 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext } func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) { + lk := &s.stripedLocks[root[len(root)-1]] + lk.Lock() + defer lk.Unlock() + // if root already exists, short-circuit if has, _ := s.Has(ctx, root); has { return dagstore.ErrShardExists diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 4f1d7f4c8b..616e5c2874 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -3,9 +3,11 @@ package eds import ( "context" "os" + "sync" "testing" "time" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/shard" "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" @@ -209,6 +211,30 @@ func TestEDSStore(t *testing.T) { assert.Contains(t, hashesOut, hash) } }) + + t.Run("Parallel put", func(t *testing.T) { + const amount = 20 + eds, dah := randomEDS(t) + + wg := sync.WaitGroup{} + for i := 1; i < amount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := edsStore.Put(ctx, dah.Hash(), eds) + if err != nil { + require.ErrorIs(t, err, dagstore.ErrShardExists) + } + }() + } + wg.Wait() + + eds, err := edsStore.Get(ctx, dah.Hash()) + require.NoError(t, err) + newDah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) + require.Equal(t, dah.Hash(), newDah.Hash()) + }) } // TestEDSStore_GC verifies that unused transient shards are collected by the GC periodically.