Skip to content

Commit

Permalink
feat(share/shrex/nd): add swamp test for shrex nd (#2227)
Browse files Browse the repository at this point in the history
## Overview
Closes #2119

---------

Co-authored-by: rene <[email protected]>
  • Loading branch information
walldiss and renaynay authored Aug 1, 2023
1 parent 13c2dab commit 08ffb69
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 8 deletions.
199 changes: 199 additions & 0 deletions nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package tests

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

func TestShrexNDFromLights(t *testing.T) {
const (
blocks = 10
btime = time.Millisecond * 300
bsize = 16
)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)

sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks)

bridge := sw.NewBridgeNode()
sw.SetBootstrapper(t, bridge)

cfg := nodebuilder.DefaultConfig(node.Light)
cfg.Share.Discovery.PeersLimit = 1
light := sw.NewNodeWithConfig(node.Light, cfg)

err := bridge.Start(ctx)
require.NoError(t, err)
err = light.Start(ctx)
require.NoError(t, err)

// wait for chain to be filled
require.NoError(t, <-fillDn)

// first 2 blocks are not filled with data
for i := 3; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)

// ensure to fetch random namespace (not the reserved namespace)
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]

expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
got, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)

require.True(t, len(got[0].Shares) > 0)
require.Equal(t, expected, got)

cancel()
}
}

func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
const (
blocks = 10
btime = time.Millisecond * 300
bsize = 16
amountOfFulls = 5
testTimeout = time.Second * 10
)

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
t.Cleanup(cancel)

sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks)

bridge := sw.NewBridgeNode()
sw.SetBootstrapper(t, bridge)

// create full nodes with basic stream.reset handler
ndHandler := func(stream network.Stream) {
_ = stream.Reset()
}
fulls := make([]*nodebuilder.Node, 0, amountOfFulls)
for i := 0; i < amountOfFulls; i++ {
cfg := nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, testTimeout)
full := sw.NewNodeWithConfig(node.Full, cfg, replaceNDServer(cfg, ndHandler), replaceShareGetter())
fulls = append(fulls, full)
}

lnConfig := nodebuilder.DefaultConfig(node.Light)
lnConfig.Share.Discovery.PeersLimit = uint(amountOfFulls)
light := sw.NewNodeWithConfig(node.Light, lnConfig)

// start all nodes
require.NoError(t, bridge.Start(ctx))
require.NoError(t, startFullNodes(ctx, fulls...))
require.NoError(t, light.Start(ctx))

// wait for chain to fill up
require.NoError(t, <-fillDn)

// first 2 blocks are not filled with data
for i := 3; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

if len(h.DAH.RowRoots) != bsize*2 {
// fill blocks does not always fill every block to the given block
// size - this check prevents trying to fetch shares for the parity
// namespace.
continue
}

reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)

// ensure to fetch random namespace (not the reserved namespace)
namespace := h.DAH.RowRoots[1][:share.NamespaceSize]

expected, err := bridge.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(expected[0].Shares) > 0)

// choose a random full to test
gotFull, err := fulls[len(fulls)/2].ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(gotFull[0].Shares) > 0)

gotLight, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, namespace)
require.NoError(t, err)
require.True(t, len(gotLight[0].Shares) > 0)

require.Equal(t, expected, gotFull)
require.Equal(t, expected, gotLight)

cancel()
}
}

func startFullNodes(ctx context.Context, fulls ...*nodebuilder.Node) error {
for _, full := range fulls {
err := full.Start(ctx)
if err != nil {
return err
}
}
return nil
}

func replaceNDServer(cfg *nodebuilder.Config, handler network.StreamHandler) fx.Option {
return fx.Decorate(fx.Annotate(
func(
host host.Host,
store *eds.Store,
getter *getters.StoreGetter,
network p2p.Network,
) (*shrexnd.Server, error) {
cfg.Share.ShrExNDParams.WithNetworkID(network.String())
return shrexnd.NewServer(cfg.Share.ShrExNDParams, host, store, getter)
},
fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error {
// replace handler for server
server.SetHandler(handler)
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Start(ctx)
}),
))
}

func replaceShareGetter() fx.Option {
return fx.Decorate(fx.Annotate(
func(
host host.Host,
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
network p2p.Network,
) share.Getter {
cascade := make([]share.Getter, 0, 2)
cascade = append(cascade, storeGetter)
cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store))
return getters.NewCascadeGetter(cascade)
},
))
}
24 changes: 16 additions & 8 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type Server struct {
host host.Host
protocolID protocol.ID

getter share.Getter
store *eds.Store
handler network.StreamHandler
getter share.Getter
store *eds.Store

params *Parameters
middleware *p2p.Middleware
Expand All @@ -52,18 +53,20 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store, getter shar
middleware: p2p.NewMiddleware(params.ConcurrencyLimit),
}

return srv, nil
}

// Start starts the server
func (srv *Server) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
srv.cancel = cancel

handler := func(s network.Stream) {
srv.handleNamespacedData(ctx, s)
}
srv.host.SetStreamHandler(srv.protocolID, srv.middleware.RateLimitHandler(handler))
srv.handler = srv.middleware.RateLimitHandler(handler)

return srv, nil
}

// Start starts the server
func (srv *Server) Start(context.Context) error {
srv.host.SetStreamHandler(srv.protocolID, srv.handler)
return nil
}

Expand All @@ -74,6 +77,11 @@ func (srv *Server) Stop(context.Context) error {
return nil
}

// SetHandler sets server handler
func (srv *Server) SetHandler(handler network.StreamHandler) {
srv.handler = handler
}

func (srv *Server) observeRateLimitedRequests() {
numRateLimited := srv.middleware.DrainCounter()
if numRateLimited > 0 {
Expand Down

0 comments on commit 08ffb69

Please sign in to comment.