Skip to content

Commit

Permalink
add swamp test for shrex nd
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed May 19, 2023
1 parent bd70494 commit 2b8689d
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 10 deletions.
2 changes: 2 additions & 0 deletions nodebuilder/share/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// TODO: some params are pointers and other are not, Let's fix this.
type Config struct {
UseShareExchange bool
UseIPLD bool
// ShrExEDSParams sets shrexeds client and server configuration parameters
ShrExEDSParams *shrexeds.Parameters
// ShrExNDParams sets shrexnd client and server configuration parameters
Expand All @@ -31,6 +32,7 @@ func DefaultConfig(tp node.Type) Config {
ShrExEDSParams: shrexeds.DefaultParameters(),
ShrExNDParams: shrexnd.DefaultParameters(),
UseShareExchange: true,
UseIPLD: true,
PeerManagerParams: peers.DefaultParameters(),
}

Expand Down
7 changes: 6 additions & 1 deletion nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func lightGetter(
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)
}
cascade = append(cascade, ipldGetter)
if cfg.UseIPLD {
cascade = append(cascade, ipldGetter)
}
return getters.NewCascadeGetter(cascade)
}

Expand All @@ -85,6 +87,9 @@ func fullGetter(
if cfg.UseShareExchange {
cascade = append(cascade, shrexGetter)
}
if cfg.UseIPLD {
cascade = append(cascade, ipldGetter)
}
cascade = append(cascade, ipldGetter)

return getters.NewTeeGetter(
Expand Down
203 changes: 203 additions & 0 deletions nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package tests

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/nodebuilder"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

func TestShrexNDFromLights(t *testing.T) {
const (
blocks = 5
btime = time.Millisecond * 300
bsize = 16
)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)

t.Cleanup(cancel)
sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks)

bridge := sw.NewBridgeNode()
addrsBridge, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host))
require.NoError(t, err)

os.Setenv(p2p.EnvKeyCelestiaBootstrapper, "true")
const defaultTimeInterval = time.Second * 5

light := newLightNode(sw, addrsBridge[0].String(), defaultTimeInterval, 1)

require.NoError(t, bridge.Start(ctx))
require.NoError(t, startLightNodes(ctx, light))
require.NoError(t, <-fillDn)

for i := 1; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

nID := h.DAH.RowsRoots[0][:8]
reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)
sh, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, nID)
cancel()
require.NoError(t, err)
require.True(t, len(sh[0].Shares) > 0)
}
}

func TestShrexNDFromLightsWithBadFulls(t *testing.T) {
const (
blocks = 5
btime = time.Millisecond * 300
bsize = 16
amountOfFulls = 25
defaultTimeInterval = time.Second * 5
)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)

sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks)

bridge := sw.NewBridgeNode()
addrsBridge, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host))
require.NoError(t, err)

os.Setenv(p2p.EnvKeyCelestiaBootstrapper, "true")

ndHandler := func(stream network.Stream) {
_ = stream.Reset()
}

// create full nodes with basic reset handler
fulls := make([]*nodebuilder.Node, 0, amountOfFulls)
for i := 0; i < amountOfFulls; i++ {
fulls = append(fulls, newFullNodeWithNDHandler(sw, addrsBridge[0].String(), defaultTimeInterval, ndHandler))
}

light := newLightNode(sw, addrsBridge[0].String(), defaultTimeInterval, amountOfFulls+1)

require.NoError(t, bridge.Start(ctx))
require.NoError(t, startFullNodes(ctx, fulls...))
require.NoError(t, startLightNodes(ctx, light))
require.NoError(t, <-fillDn)

for i := 1; i < blocks; i++ {
h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i))
require.NoError(t, err)

nID := h.DAH.RowsRoots[0][:8]
reqCtx, cancel := context.WithTimeout(ctx, time.Second*5)
sh, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, nID)
cancel()
require.NoError(t, err)
require.True(t, len(sh[0].Shares) > 0)
}
}

func newLightNode(
sw *swamp.Swamp,
bootstarpperAddr string,
defaultTimeInterval time.Duration,
amountOfFulls int,
) *nodebuilder.Node {
lnConfig := nodebuilder.DefaultConfig(node.Light)
lnConfig.Share.UseIPLD = false
lnConfig.Share.Discovery.PeersLimit = uint(amountOfFulls)
setTimeInterval(lnConfig, defaultTimeInterval)
lnConfig.Header.TrustedPeers = append(lnConfig.Header.TrustedPeers, bootstarpperAddr)
return sw.NewNodeWithConfig(node.Light, lnConfig)
}

func startLightNodes(ctx context.Context, nodes ...*nodebuilder.Node) error {
for _, node := range nodes {
sub, err := node.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
if err != nil {
return err
}

if err = node.Start(ctx); err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case <-sub.Out():
if err = sub.Close(); err != nil {
return err
}
}
}
return nil
}

func newFullNodeWithNDHandler(
sw *swamp.Swamp,
bootstarpperAddr string,
defaultTimeInterval time.Duration,
handler network.StreamHandler,
) *nodebuilder.Node {
cfg := nodebuilder.DefaultConfig(node.Full)
cfg.Share.UseIPLD = false
setTimeInterval(cfg, defaultTimeInterval)
cfg.Header.TrustedPeers = []string{
"/ip4/1.2.3.4/tcp/12345/p2p/12D3KooWNaJ1y1Yio3fFJEXCZyd1Cat3jmrPdgkYCrHfKD3Ce21p",
}
cfg.Header.TrustedPeers = append(cfg.Header.TrustedPeers, bootstarpperAddr)

return sw.NewNodeWithConfig(node.Full, cfg, replaceNDServer(cfg, handler))
}

func startFullNodes(ctx context.Context, fulls ...*nodebuilder.Node) error {
for i, full := range fulls {
if full == nil {
panic("asdads" + fmt.Sprintf("%v", i))
}
err := full.Start(ctx)
if err != nil {
return err
}
}
return nil
}

func replaceNDServer(cfg *nodebuilder.Config, handler network.StreamHandler) fx.Option {
return fx.Decorate(fx.Annotate(
func(
host host.Host,
store *eds.Store,
getter *getters.StoreGetter,
network p2p.Network,
) (*shrexnd.Server, error) {
cfg.Share.ShrExNDParams.WithNetworkID(network.String())
return shrexnd.NewServer(cfg.Share.ShrExNDParams, host, store, getter)
},
fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error {
// replace handler for server
server.SetHandler(handler)
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Start(ctx)
}),
))
}
24 changes: 15 additions & 9 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type Server struct {
host host.Host
protocolID protocol.ID

getter share.Getter
store *eds.Store
handler network.StreamHandler
getter share.Getter
store *eds.Store

params *Parameters
middleware *p2p.Middleware
Expand All @@ -52,18 +53,18 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store, getter shar
middleware: p2p.NewMiddleware(params.ConcurrencyLimit),
}

ctx, cancel := context.WithCancel(context.Background())
srv.cancel = cancel
srv.handler = func(s network.Stream) {
srv.handleNamespacedData(ctx, s)
}

return srv, nil
}

// Start starts the server
func (srv *Server) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
srv.cancel = cancel

handler := func(s network.Stream) {
srv.handleNamespacedData(ctx, s)
}
srv.host.SetStreamHandler(srv.protocolID, srv.middleware.RateLimitHandler(handler))
srv.host.SetStreamHandler(srv.protocolID, srv.handler)
return nil
}

Expand All @@ -74,6 +75,11 @@ func (srv *Server) Stop(context.Context) error {
return nil
}

// SetHandler sets server handler
func (srv *Server) SetHandler(handler network.StreamHandler) {
srv.handler = handler
}

func (srv *Server) observeRateLimitedRequests() {
numRateLimited := srv.middleware.DrainCounter()
if numRateLimited > 0 {
Expand Down

0 comments on commit 2b8689d

Please sign in to comment.