Skip to content

Commit

Permalink
refactor(share): GetShare -> GetSamples
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 28, 2024
1 parent 08ca0ed commit 6a8bad4
Show file tree
Hide file tree
Showing 21 changed files with 246 additions and 153 deletions.
7 changes: 3 additions & 4 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,10 +902,9 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *
nd, err := eds.NamespaceData(ctx, accessor, ns)
return nd, err
})
shareGetter.EXPECT().GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, row, col int) (libshare.Share, error) {
s, err := accessor.Sample(ctx, row, col)
return s.Share, err
shareGetter.EXPECT().GetSamples(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) {
return smpls, nil

Check failure on line 907 in blob/service_test.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

undefined: smpls (typecheck)

Check failure on line 907 in blob/service_test.go

View workflow job for this annotation

GitHub Actions / go-ci / Unit Tests Coverage (ubuntu-latest)

undefined: smpls

Check failure on line 907 in blob/service_test.go

View workflow job for this annotation

GitHub Actions / go-ci / Unit Tests Coverage (macos-14)

undefined: smpls
})

// create header and put it into the store
Expand Down
14 changes: 13 additions & 1 deletion nodebuilder/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,24 @@ type module struct {
hs headerServ.Module
}

// TODO(@Wondertan): break
func (m module) GetShare(ctx context.Context, height uint64, row, col int) (libshare.Share, error) {
header, err := m.hs.GetByHeight(ctx, height)
if err != nil {
return libshare.Share{}, err
}
return m.getter.GetShare(ctx, header, row, col)

idx, err := shwap.SampleIndexFromCoordinates(row, col, len(header.DAH.RowRoots))
if err != nil {
return libshare.Share{}, err
}

smpls, err := m.getter.GetSamples(ctx, header, []shwap.SampleIndex{idx})
if err != nil {
return libshare.Share{}, err
}

return smpls[0].Share, nil
}

func (m module) GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedDataSquare, error) {
Expand Down
40 changes: 20 additions & 20 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,34 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, header *header
return err
}

var (
failedSamplesLock sync.Mutex
failedSamples []Sample
)

log.Debugw("starting sampling session", "root", dah.String())
var wg sync.WaitGroup
for _, s := range samples {
wg.Add(1)
go func(s Sample) {
defer wg.Done()
// check if the sample is available
_, err := la.getter.GetShare(ctx, header, int(s.Row), int(s.Col))
if err != nil {
log.Debugw("error fetching share", "root", dah.String(), "row", s.Row, "col", s.Col)
failedSamplesLock.Lock()
failedSamples = append(failedSamples, s)
failedSamplesLock.Unlock()
}
}(s)

idxs := make([]shwap.SampleIndex, len(samples))
for i, s := range samples {
idx, err := shwap.SampleIndexFromCoordinates(int(s.Row), int(s.Col), len(dah.RowRoots))
if err != nil {
return err
}

idxs[i] = idx
}
wg.Wait()

smpls, err := la.getter.GetSamples(ctx, header, idxs)
if errors.Is(ctx.Err(), context.Canceled) {
// Availability did not complete due to context cancellation, return context error instead of
// share.ErrNotAvailable
return ctx.Err()
}
if len(smpls) == 0 {
return share.ErrNotAvailable
}

var failedSamples []Sample
for i, smpl := range smpls {
if smpl.IsEmpty() {
failedSamples = append(failedSamples, samples[i])
}
}

// store the result of the sampling session
bs := encodeSamples(failedSamples)
Expand Down
59 changes: 40 additions & 19 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/stretchr/testify/require"

libshare "github.com/celestiaorg/go-square/v2/share"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/getters/mock"
Expand All @@ -26,22 +28,32 @@ func TestSharesAvailableCaches(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eds := edstest.RandEDS(t, 16)
roots, err := share.NewAxisRoots(eds)
square := edstest.RandEDS(t, 16)
roots, err := share.NewAxisRoots(square)
require.NoError(t, err)
eh := headertest.RandExtendedHeaderWithRoot(t, roots)

getter := mock.NewMockGetter(gomock.NewController(t))
getter.EXPECT().
GetShare(gomock.Any(), eh, gomock.Any(), gomock.Any()).
GetSamples(gomock.Any(), eh, gomock.Any()).
DoAndReturn(
func(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) {
rawSh := eds.GetCell(uint(row), uint(col))
sh, err := libshare.NewShare(rawSh)
if err != nil {
return libshare.Share{}, err
func(_ context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) {
acc := eds.Rsmt2D{ExtendedDataSquare: square}
smpls := make([]shwap.Sample, len(indices))
for i, idx := range indices {
rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots))
if err != nil {
return nil, err
}

smpl, err := acc.Sample(ctx, rowIdx, colIdx)
if err != nil {
return nil, err
}

smpls[i] = smpl
}
return *sh, nil
return smpls, nil
}).
AnyTimes()

Expand Down Expand Up @@ -71,8 +83,8 @@ func TestSharesAvailableHitsCache(t *testing.T) {
// create getter that always return ErrNotFound
getter := mock.NewMockGetter(gomock.NewController(t))
getter.EXPECT().
GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(libshare.Share{}, shrex.ErrNotFound).
GetSamples(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, shrex.ErrNotFound).
AnyTimes()

ds := datastore.NewMapDatastore()
Expand Down Expand Up @@ -125,8 +137,8 @@ func TestSharesAvailableFailed(t *testing.T) {

// getter doesn't have the eds, so it should fail
getter.EXPECT().
GetShare(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(libshare.Share{}, shrex.ErrNotFound).
GetSamples(gomock.Any(), gomock.Any(), gomock.Any()).
Return(make([]shwap.Sample, avail.params.SampleAmount), shrex.ErrNotFound).
AnyTimes()
err = avail.SharesAvailable(ctx, eh)
require.ErrorIs(t, err, share.ErrNotAvailable)
Expand Down Expand Up @@ -175,15 +187,24 @@ func (m onceGetter) AddSamples(samples []Sample) {
}
}

func (m onceGetter) GetShare(_ context.Context, _ *header.ExtendedHeader, row, col int) (libshare.Share, error) {
func (m onceGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) {
m.Lock()
defer m.Unlock()
s := Sample{Row: uint16(row), Col: uint16(col)}
if _, ok := m.available[s]; ok {
delete(m.available, s)
return libshare.Share{}, nil

smpls := make([]shwap.Sample, 0, len(indices))
for _, idx := range indices {
rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots))
if err != nil {
return nil, err
}

s := Sample{Row: uint16(rowIdx), Col: uint16(colIdx)}
if _, ok := m.available[s]; ok {
delete(m.available, s)
smpls = append(smpls, shwap.Sample{Proof: &nmt.Proof{}})
}
}
return libshare.Share{}, share.ErrNotAvailable
return smpls, nil
}

func (m onceGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) {
Expand Down
1 change: 1 addition & 0 deletions share/eds/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Accessor interface {
// Sample returns share and corresponding proof for row and column indices. Implementation can
// choose which axis to use for proof. Chosen axis for proof should be indicated in the returned
// Sample.
// TODO(@Wondertan): change to SampleIndex
Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error)
// AxisHalf returns half of shares axis of the given type and index. Side is determined by
// implementation. Implementations should indicate the side in the returned AxisHalf.
Expand Down
7 changes: 6 additions & 1 deletion share/eds/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func (f validation) Size(ctx context.Context) int {
}

func (f validation) Sample(ctx context.Context, rowIdx, colIdx int) (shwap.Sample, error) {
_, err := shwap.NewSampleID(1, rowIdx, colIdx, f.Size(ctx))
idx, err := shwap.SampleIndexFromCoordinates(rowIdx, colIdx, f.Size(ctx))
if err != nil {
return shwap.Sample{}, err
}

_, err = shwap.NewSampleID(1, idx, f.Size(ctx))
if err != nil {
return shwap.Sample{}, fmt.Errorf("sample validation: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions share/shwap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ var (
//
//go:generate mockgen -destination=getters/mock/getter.go -package=mock . Getter
type Getter interface {
// GetShare gets a Share by coordinates in EDS.
GetShare(ctx context.Context, header *header.ExtendedHeader, row, col int) (libshare.Share, error)
// GetSamples gets samples by their indices.
// Returns Sample slice with requested number of samples in the requested order.
// May return partial response with some samples being empty if they weren't found.
GetSamples(ctx context.Context, header *header.ExtendedHeader, indices []SampleIndex) ([]Sample, error)

// GetEDS gets the full EDS identified by the given extended header.
GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error)
Expand Down
21 changes: 6 additions & 15 deletions share/shwap/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,15 @@ func NewCascadeGetter(getters []shwap.Getter) *CascadeGetter {
}
}

// GetShare gets a share from any of registered shwap.Getters in cascading order.
func (cg *CascadeGetter) GetShare(
ctx context.Context, header *header.ExtendedHeader, row, col int,
) (libshare.Share, error) {
ctx, span := tracer.Start(ctx, "cascade/get-share", trace.WithAttributes(
attribute.Int("row", row),
attribute.Int("col", col),
// GetSamples gets samples from any of registered shwap.Getters in cascading order.
func (cg *CascadeGetter) GetSamples(ctx context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) {
ctx, span := tracer.Start(ctx, "cascade/get-samples", trace.WithAttributes(
attribute.Int("amount", len(indices)),
))
defer span.End()

upperBound := len(header.DAH.RowRoots)
if row >= upperBound || col >= upperBound {
err := shwap.ErrOutOfBounds
span.RecordError(err)
return libshare.Share{}, err
}
get := func(ctx context.Context, get shwap.Getter) (libshare.Share, error) {
return get.GetShare(ctx, header, row, col)
get := func(ctx context.Context, get shwap.Getter) ([]shwap.Sample, error) {
return get.GetSamples(ctx, hdr, indices)
}

return cascadeGetters(ctx, cg.getters, get)
Expand Down
2 changes: 1 addition & 1 deletion share/shwap/getters/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestCascadeGetter(t *testing.T) {
getter := NewCascadeGetter(getters)
t.Run("GetShare", func(t *testing.T) {
for _, eh := range headers {
sh, err := getter.GetShare(ctx, eh, 0, 0)
sh, err := getter.GetSamples(ctx, eh, []shwap.SampleIndex{0})
assert.NoError(t, err)
assert.NotEmpty(t, sh)
}
Expand Down
14 changes: 7 additions & 7 deletions share/shwap/getters/mock/getter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 27 additions & 19 deletions share/shwap/getters/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,51 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
"github.com/celestiaorg/celestia-node/share/shwap"
)

// TestGetter provides a testing SingleEDSGetter and the root of the EDS it holds.
func TestGetter(t *testing.T) (shwap.Getter, *header.ExtendedHeader) {
eds := edstest.RandEDS(t, 8)
roots, err := share.NewAxisRoots(eds)
square := edstest.RandEDS(t, 8)
roots, err := share.NewAxisRoots(square)
eh := headertest.RandExtendedHeaderWithRoot(t, roots)
require.NoError(t, err)
return &SingleEDSGetter{
EDS: eds,
EDS: eds.Rsmt2D{ExtendedDataSquare: square},
}, eh
}

// SingleEDSGetter contains a single EDS where data is retrieved from.
// Its primary use is testing, and GetSharesByNamespace is not supported.
type SingleEDSGetter struct {
EDS *rsmt2d.ExtendedDataSquare
EDS eds.Rsmt2D
}

// GetShare gets a share from a kept EDS if exist and if the correct root is given.
func (seg *SingleEDSGetter) GetShare(
_ context.Context,
header *header.ExtendedHeader,
row, col int,
) (libshare.Share, error) {
err := seg.checkRoots(header.DAH)
// GetSamples get samples from a kept EDS if exist and if the correct root is given.
func (seg *SingleEDSGetter) GetSamples(ctx context.Context, hdr *header.ExtendedHeader, indices []shwap.SampleIndex) ([]shwap.Sample, error) {
err := seg.checkRoots(hdr.DAH)
if err != nil {
return libshare.Share{}, err
return nil, err
}
rawSh := seg.EDS.GetCell(uint(row), uint(col))
sh, err := libshare.NewShare(rawSh)
if err != nil {
return libshare.Share{}, err

smpls := make([]shwap.Sample, len(indices))
for i, idx := range indices {
rowIdx, colIdx, err := idx.Coordinates(len(hdr.DAH.RowRoots))
if err != nil {
return nil, err
}

smpl, err := seg.EDS.Sample(ctx, rowIdx, colIdx)
if err != nil {
return nil, err
}

smpls[i] = smpl
}
return *sh, nil

return smpls, nil
}

// GetEDS returns a kept EDS if the correct root is given.
Expand All @@ -62,7 +70,7 @@ func (seg *SingleEDSGetter) GetEDS(
if err != nil {
return nil, err
}
return seg.EDS, nil
return seg.EDS.ExtendedDataSquare, nil
}

// GetSharesByNamespace returns NamespacedShares from a kept EDS if the correct root is given.
Expand All @@ -72,7 +80,7 @@ func (seg *SingleEDSGetter) GetSharesByNamespace(context.Context, *header.Extend
}

func (seg *SingleEDSGetter) checkRoots(roots *share.AxisRoots) error {
dah, err := da.NewDataAvailabilityHeader(seg.EDS)
dah, err := da.NewDataAvailabilityHeader(seg.EDS.ExtendedDataSquare)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 6a8bad4

Please sign in to comment.