diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index cd9514fb75..de47b756a2 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -14,6 +14,7 @@ import ( // TODO: some params are pointers and other are not, Let's fix this. type Config struct { UseShareExchange bool + UseIPLD bool // ShrExEDSParams sets shrexeds client and server configuration parameters ShrExEDSParams *shrexeds.Parameters // ShrExNDParams sets shrexnd client and server configuration parameters @@ -31,6 +32,7 @@ func DefaultConfig(tp node.Type) Config { ShrExEDSParams: shrexeds.DefaultParameters(), ShrExNDParams: shrexnd.DefaultParameters(), UseShareExchange: true, + UseIPLD: true, PeerManagerParams: peers.DefaultParameters(), } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index a3fe691798..84b7087f33 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -69,7 +69,9 @@ func lightGetter( if cfg.UseShareExchange { cascade = append(cascade, shrexGetter) } - cascade = append(cascade, ipldGetter) + if cfg.UseIPLD { + cascade = append(cascade, ipldGetter) + } return getters.NewCascadeGetter(cascade) } @@ -85,6 +87,9 @@ func fullGetter( if cfg.UseShareExchange { cascade = append(cascade, shrexGetter) } + if cfg.UseIPLD { + cascade = append(cascade, ipldGetter) + } cascade = append(cascade, ipldGetter) return getters.NewTeeGetter( diff --git a/nodebuilder/tests/nd_test.go b/nodebuilder/tests/nd_test.go new file mode 100644 index 0000000000..45338f04ef --- /dev/null +++ b/nodebuilder/tests/nd_test.go @@ -0,0 +1,203 @@ +package tests + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "github.com/celestiaorg/celestia-node/nodebuilder" + "github.com/celestiaorg/celestia-node/nodebuilder/node" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" + "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" + "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/getters" + "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" +) + +func TestShrexNDFromLights(t *testing.T) { + const ( + blocks = 5 + btime = time.Millisecond * 300 + bsize = 16 + ) + + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + + t.Cleanup(cancel) + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + bridge := sw.NewBridgeNode() + addrsBridge, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host)) + require.NoError(t, err) + + os.Setenv(p2p.EnvKeyCelestiaBootstrapper, "true") + const defaultTimeInterval = time.Second * 5 + + light := newLightNode(sw, addrsBridge[0].String(), defaultTimeInterval, 1) + + require.NoError(t, bridge.Start(ctx)) + require.NoError(t, startLightNodes(ctx, light)) + require.NoError(t, <-fillDn) + + for i := 1; i < blocks; i++ { + h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + + nID := h.DAH.RowsRoots[0][:8] + reqCtx, cancel := context.WithTimeout(ctx, time.Second*5) + sh, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, nID) + cancel() + require.NoError(t, err) + require.True(t, len(sh[0].Shares) > 0) + } +} + +func TestShrexNDFromLightsWithBadFulls(t *testing.T) { + const ( + blocks = 5 + btime = time.Millisecond * 300 + bsize = 16 + amountOfFulls = 25 + defaultTimeInterval = time.Second * 5 + ) + + ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) + t.Cleanup(cancel) + + sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime)) + fillDn := swamp.FillBlocks(ctx, sw.ClientContext, sw.Accounts, bsize, blocks) + + bridge := sw.NewBridgeNode() + addrsBridge, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host)) + require.NoError(t, err) + + os.Setenv(p2p.EnvKeyCelestiaBootstrapper, "true") + + ndHandler := func(stream network.Stream) { + _ = stream.Reset() + } + + // create full nodes with basic reset handler + fulls := make([]*nodebuilder.Node, 0, amountOfFulls) + for i := 0; i < amountOfFulls; i++ { + fulls = append(fulls, newFullNodeWithNDHandler(sw, addrsBridge[0].String(), defaultTimeInterval, ndHandler)) + } + + light := newLightNode(sw, addrsBridge[0].String(), defaultTimeInterval, amountOfFulls+1) + + require.NoError(t, bridge.Start(ctx)) + require.NoError(t, startFullNodes(ctx, fulls...)) + require.NoError(t, startLightNodes(ctx, light)) + require.NoError(t, <-fillDn) + + for i := 1; i < blocks; i++ { + h, err := bridge.HeaderServ.GetByHeight(ctx, uint64(i)) + require.NoError(t, err) + + nID := h.DAH.RowsRoots[0][:8] + reqCtx, cancel := context.WithTimeout(ctx, time.Second*5) + sh, err := light.ShareServ.GetSharesByNamespace(reqCtx, h.DAH, nID) + cancel() + require.NoError(t, err) + require.True(t, len(sh[0].Shares) > 0) + } +} + +func newLightNode( + sw *swamp.Swamp, + bootstarpperAddr string, + defaultTimeInterval time.Duration, + amountOfFulls int, +) *nodebuilder.Node { + lnConfig := nodebuilder.DefaultConfig(node.Light) + lnConfig.Share.UseIPLD = false + lnConfig.Share.Discovery.PeersLimit = uint(amountOfFulls) + setTimeInterval(lnConfig, defaultTimeInterval) + lnConfig.Header.TrustedPeers = append(lnConfig.Header.TrustedPeers, bootstarpperAddr) + return sw.NewNodeWithConfig(node.Light, lnConfig) +} + +func startLightNodes(ctx context.Context, nodes ...*nodebuilder.Node) error { + for _, node := range nodes { + sub, err := node.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) + if err != nil { + return err + } + + if err = node.Start(ctx); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-sub.Out(): + if err = sub.Close(); err != nil { + return err + } + } + } + return nil +} + +func newFullNodeWithNDHandler( + sw *swamp.Swamp, + bootstarpperAddr string, + defaultTimeInterval time.Duration, + handler network.StreamHandler, +) *nodebuilder.Node { + cfg := nodebuilder.DefaultConfig(node.Full) + cfg.Share.UseIPLD = false + setTimeInterval(cfg, defaultTimeInterval) + cfg.Header.TrustedPeers = []string{ + "/ip4/1.2.3.4/tcp/12345/p2p/12D3KooWNaJ1y1Yio3fFJEXCZyd1Cat3jmrPdgkYCrHfKD3Ce21p", + } + cfg.Header.TrustedPeers = append(cfg.Header.TrustedPeers, bootstarpperAddr) + + return sw.NewNodeWithConfig(node.Full, cfg, replaceNDServer(cfg, handler)) +} + +func startFullNodes(ctx context.Context, fulls ...*nodebuilder.Node) error { + for i, full := range fulls { + if full == nil { + panic("asdads" + fmt.Sprintf("%v", i)) + } + err := full.Start(ctx) + if err != nil { + return err + } + } + return nil +} + +func replaceNDServer(cfg *nodebuilder.Config, handler network.StreamHandler) fx.Option { + return fx.Decorate(fx.Annotate( + func( + host host.Host, + store *eds.Store, + getter *getters.StoreGetter, + network p2p.Network, + ) (*shrexnd.Server, error) { + cfg.Share.ShrExNDParams.WithNetworkID(network.String()) + return shrexnd.NewServer(cfg.Share.ShrExNDParams, host, store, getter) + }, + fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error { + // replace handler for server + server.SetHandler(handler) + return server.Start(ctx) + }), + fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error { + return server.Start(ctx) + }), + )) +} diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index 5c1fe80f52..16d62e9de3 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -29,8 +29,9 @@ type Server struct { host host.Host protocolID protocol.ID - getter share.Getter - store *eds.Store + handler network.StreamHandler + getter share.Getter + store *eds.Store params *Parameters middleware *p2p.Middleware @@ -52,18 +53,18 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store, getter shar middleware: p2p.NewMiddleware(params.ConcurrencyLimit), } + ctx, cancel := context.WithCancel(context.Background()) + srv.cancel = cancel + srv.handler = func(s network.Stream) { + srv.handleNamespacedData(ctx, s) + } + return srv, nil } // Start starts the server func (srv *Server) Start(context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) - srv.cancel = cancel - - handler := func(s network.Stream) { - srv.handleNamespacedData(ctx, s) - } - srv.host.SetStreamHandler(srv.protocolID, srv.middleware.RateLimitHandler(handler)) + srv.host.SetStreamHandler(srv.protocolID, srv.handler) return nil } @@ -74,6 +75,11 @@ func (srv *Server) Stop(context.Context) error { return nil } +// SetHandler sets server handler +func (srv *Server) SetHandler(handler network.StreamHandler) { + srv.handler = handler +} + func (srv *Server) observeRateLimitedRequests() { numRateLimited := srv.middleware.DrainCounter() if numRateLimited > 0 {