Skip to content

Commit

Permalink
fix(share/eds): dagstore shard restore reflection workaround (celesti…
Browse files Browse the repository at this point in the history
…aorg#2559)

Turns out `dagstore` has bug in its restore shard code. For each shard
it attempts to create deep copy of registered type, but copies only
basic struct fields of the mount type. In out case it means, that it
copies a pointer. That causes all shards to point to the same mount
(same pointer).
Before `dagstore` is fixed to take into account possibility of
pointers/interface (and any other reference type) in mount type struct
fields, we will use direct copy of `mount.FileMount` instead of
`mount.Mount` interface.

(cherry picked from commit 003c2c4)
  • Loading branch information
walldiss committed Sep 22, 2023
1 parent 33ac989 commit 4d33e51
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 27 deletions.
92 changes: 70 additions & 22 deletions nodebuilder/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"context"
"strconv"
"testing"
"time"

"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -68,30 +67,13 @@ func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

tmpDir := b.TempDir()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(tmpDir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

// BenchmarkStore/bench_read_128-10 14 78970661 ns/op (~70ms)
b.Run("bench put 128", func(b *testing.B) {
b.ResetTimer()
dir := b.TempDir()

err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(b, err)

store, err := OpenStore(dir, nil)
require.NoError(b, err)
ds, err := store.Datastore()
require.NoError(b, err)
edsStore, err := eds.NewStore(dir, ds)
require.NoError(b, err)
err = edsStore.Start(ctx)
require.NoError(b, err)

store := newStore(ctx, b, dir)
size := 128
b.Run("enabled eds proof caching", func(b *testing.B) {
b.StopTimer()
Expand All @@ -111,7 +93,7 @@ func BenchmarkStore(b *testing.B) {
ctx := ipld.CtxWithProofsAdder(ctx, adder)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
err = store.edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
Expand All @@ -126,10 +108,76 @@ func BenchmarkStore(b *testing.B) {
require.NoError(b, err)

b.StartTimer()
err = edsStore.Put(ctx, dah.Hash(), eds)
err = store.edsStore.Put(ctx, dah.Hash(), eds)
b.StopTimer()
require.NoError(b, err)
}
})
})
}

func TestStoreRestart(t *testing.T) {
const (
blocks = 5
size = 32
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

dir := t.TempDir()
err := Init(*DefaultConfig(node.Full), dir, node.Full)
require.NoError(t, err)

store := newStore(ctx, t, dir)

hashes := make([][]byte, blocks)
for i := range hashes {
edss := edstest.RandEDS(t, size)
require.NoError(t, err)
dah, err := da.NewDataAvailabilityHeader(edss)
require.NoError(t, err)
err = store.edsStore.Put(ctx, dah.Hash(), edss)
require.NoError(t, err)

// store hashes for read loop later
hashes[i] = dah.Hash()
}

// restart store
store.stop(ctx, t)
store = newStore(ctx, t, dir)

for _, h := range hashes {
edsReader, err := store.edsStore.GetCAR(ctx, h)
require.NoError(t, err)
odsReader, err := eds.ODSReader(edsReader)
require.NoError(t, err)
_, err = eds.ReadEDS(ctx, odsReader, h)
require.NoError(t, err)
}
}

type store struct {
s Store
edsStore *eds.Store
}

func newStore(ctx context.Context, t require.TestingT, dir string) store {
s, err := OpenStore(dir, nil)
require.NoError(t, err)
ds, err := s.Datastore()
require.NoError(t, err)
edsStore, err := eds.NewStore(dir, ds)
require.NoError(t, err)
err = edsStore.Start(ctx)
require.NoError(t, err)
return store{
s: s,
edsStore: edsStore,
}
}

func (s *store) stop(ctx context.Context, t *testing.T) {
require.NoError(t, s.edsStore.Stop(ctx))
require.NoError(t, s.s.Close())
}
10 changes: 5 additions & 5 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

r := mount.NewRegistry()
err = r.Register("fs", &inMemoryOnceMount{Mount: &mount.FileMount{}})
err = r.Register("fs", &inMemoryOnceMount{})
if err != nil {
return nil, fmt.Errorf("failed to register memory mount on the registry: %w", err)
}
Expand Down Expand Up @@ -212,8 +212,8 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
// save encoded eds into buffer
mount := &inMemoryOnceMount{
// TODO: buffer could be pre-allocated with capacity calculated based on eds size.
buf: bytes.NewBuffer(nil),
Mount: &mount.FileMount{Path: s.basepath + blocksPath + key},
buf: bytes.NewBuffer(nil),
FileMount: mount.FileMount{Path: s.basepath + blocksPath + key},
}
err = WriteEDS(ctx, square, mount)
if err != nil {
Expand Down Expand Up @@ -565,7 +565,7 @@ type inMemoryOnceMount struct {
buf *bytes.Buffer

readOnce atomic.Bool
mount.Mount
mount.FileMount
}

func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) {
Expand All @@ -575,7 +575,7 @@ func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) {
m.buf = nil
return reader, nil
}
return m.Mount.Fetch(ctx)
return m.FileMount.Fetch(ctx)
}

func (m *inMemoryOnceMount) Write(b []byte) (int, error) {
Expand Down

0 comments on commit 4d33e51

Please sign in to comment.