Skip to content

Commit

Permalink
Remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil committed Dec 11, 2024
1 parent 8bf9be9 commit 1a5462f
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 1,161 deletions.
101 changes: 2 additions & 99 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/Masterminds/semver/v3"
Expand All @@ -21,7 +20,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/crypto/pb"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -43,10 +41,8 @@ type Service struct {
handler *starknet.Handler
log utils.SimpleLogger

dht *dht.IpfsDHT
pubsub *pubsub.PubSub
topics map[string]*pubsub.Topic
topicsLock sync.RWMutex
dht *dht.IpfsDHT
pubsub *pubsub.PubSub

synchroniser *syncService
gossipTracer *gossipTracer
Expand Down Expand Up @@ -157,7 +153,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
network: snNetwork,
dht: p2pdht,
feederNode: feederNode,
topics: make(map[string]*pubsub.Topic),
handler: starknet.NewHandler(bc, log),
database: database,
}
Expand Down Expand Up @@ -204,34 +199,6 @@ func privateKey(privKeyStr string) (crypto.PrivKey, error) {
return prvKey, nil
}

func (s *Service) SubscribePeerConnectednessChanged(ctx context.Context) (<-chan event.EvtPeerConnectednessChanged, error) {
ch := make(chan event.EvtPeerConnectednessChanged)
sub, err := s.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}

go func() {
for {
select {
case <-ctx.Done():
if err = sub.Close(); err != nil {
s.log.Warnw("Failed to close subscription", "err", err)
}
close(ch)
return
case evnt := <-sub.Out():
typedEvnt := evnt.(event.EvtPeerConnectednessChanged)
if typedEvnt.Connectedness == network.Connected {
ch <- typedEvnt
}
}
}
}()

return ch, nil
}

// Run starts the p2p service. Calling any other function before run is undefined behaviour
func (s *Service) Run(ctx context.Context) error {
defer func() {
Expand Down Expand Up @@ -336,70 +303,6 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
}
}

func (s *Service) joinTopic(topic string) (*pubsub.Topic, error) {
existingTopic := func() *pubsub.Topic {
s.topicsLock.RLock()
defer s.topicsLock.RUnlock()
if t, found := s.topics[topic]; found {
return t
}
return nil
}()

if existingTopic != nil {
return existingTopic, nil
}

newTopic, err := s.pubsub.Join(topic)
if err != nil {
return nil, err
}

s.topicsLock.Lock()
defer s.topicsLock.Unlock()
s.topics[topic] = newTopic
return newTopic, nil
}

func (s *Service) SubscribeToTopic(topic string) (chan []byte, func(), error) {
t, joinErr := s.joinTopic(topic)
if joinErr != nil {
return nil, nil, joinErr
}

sub, subErr := t.Subscribe()
if subErr != nil {
return nil, nil, subErr
}

const bufferSize = 16
ch := make(chan []byte, bufferSize)
// go func() {
// for {
// msg, err := sub.Next(s.runCtx)
// if err != nil {
// close(ch)
// return
// }
// only forward messages delivered by others
// if msg.ReceivedFrom == s.host.ID() {
// continue
// }
//
// select {
// case ch <- msg.GetData():
// case <-s.runCtx.Done():
// }
// }
// }()
return ch, sub.Cancel, nil
}

func (s *Service) PublishOnTopic(topic string) error {
_, err := s.joinTopic(topic)
return err
}

func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
s.host.SetStreamHandler(pid, handler)
}
Expand Down
142 changes: 0 additions & 142 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
@@ -1,141 +1,17 @@
package p2p_test

import (
"context"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/p2p"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestService(t *testing.T) {
t.Skip("TestService")
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
peerHosts := net.Hosts()
require.Len(t, peerHosts, 2)

timeout := time.Second
testCtx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
peerA, err := p2p.NewWithHost(
peerHosts[0],
"",
false,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

events, err := peerA.SubscribePeerConnectednessChanged(testCtx)
require.NoError(t, err)

peerAddrs, err := peerA.ListenAddrs()
require.NoError(t, err)

peerAddrsString := make([]string, 0, len(peerAddrs))
for _, addr := range peerAddrs {
peerAddrsString = append(peerAddrsString, addr.String())
}

peerB, err := p2p.NewWithHost(
peerHosts[1],
strings.Join(peerAddrsString, ","),
true,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
require.NoError(t, peerA.Run(testCtx))
}()
go func() {
defer wg.Done()
require.NoError(t, peerB.Run(testCtx))
}()

select {
case evt := <-events:
require.Equal(t, network.Connected, evt.Connectedness)
case <-time.After(timeout):
require.True(t, false, "no events were emitted")
}

t.Run("gossip", func(t *testing.T) {
t.Skip() // todo: flaky test
topic := "coolTopic"
ch, closer, err := peerA.SubscribeToTopic(topic)
require.NoError(t, err)
t.Cleanup(closer)

maxRetries := 4
RetryLoop:
for i := 0; i < maxRetries; i++ {
gossipedMessage := []byte(`veryImportantMessage`)
require.NoError(t, peerB.PublishOnTopic(topic))

select {
case <-time.After(time.Second):
if i == maxRetries-1 {
require.Fail(t, "timeout: never received the message")
}
case msg := <-ch:
require.Equal(t, gossipedMessage, msg)
break RetryLoop
}
}
})

t.Run("protocol handler", func(t *testing.T) {
ch := make(chan []byte)

superSecretProtocol := protocol.ID("superSecretProtocol")
peerA.SetProtocolHandler(superSecretProtocol, func(stream network.Stream) {
read, err := io.ReadAll(stream)
require.NoError(t, err)
ch <- read
})

peerAStream, err := peerB.NewStream(testCtx, superSecretProtocol)
require.NoError(t, err)

superSecretMessage := []byte(`superSecretMessage`)
_, err = peerAStream.Write(superSecretMessage)
require.NoError(t, err)
require.NoError(t, peerAStream.Close())

select {
case <-time.After(timeout):
require.Equal(t, true, false)
case msg := <-ch:
require.Equal(t, superSecretMessage, msg)
}
})

cancel()
wg.Wait()
}

func TestInvalidKey(t *testing.T) {
_, err := p2p.New(
"/ip4/127.0.0.1/tcp/30301",
Expand All @@ -153,24 +29,6 @@ func TestInvalidKey(t *testing.T) {
require.Error(t, err)
}

func TestValidKey(t *testing.T) {
t.Skip("TestValidKey")
_, err := p2p.New(
"/ip4/127.0.0.1/tcp/30301",
"",
"peerA",
"",
"08011240333b4a433f16d7ca225c0e99d0d8c437b835cb74a98d9279c561977690c80f681b25ccf3fa45e2f2de260149c112fa516b69057dd3b0151a879416c0cb12d9b3",
false,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)

require.NoError(t, err)
}

func TestLoadAndPersistPeers(t *testing.T) {
testDB := pebble.NewMemTest(t)

Expand Down
10 changes: 5 additions & 5 deletions p2p/starknet/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ func streamHandler[ReqT proto.Message](ctx context.Context, wg *sync.WaitGroup,
}

func (h *Handler) HeadersHandler(stream network.Stream) {
streamHandler[*spec.BlockHeadersRequest](h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
}

func (h *Handler) EventsHandler(stream network.Stream) {
streamHandler[*spec.EventsRequest](h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
}

func (h *Handler) TransactionsHandler(stream network.Stream) {
streamHandler[*spec.TransactionsRequest](h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
}

func (h *Handler) ClassesHandler(stream network.Stream) {
streamHandler[*spec.ClassesRequest](h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
}

func (h *Handler) StateDiffHandler(stream network.Stream) {
streamHandler[*spec.StateDiffsRequest](h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
}

func (h *Handler) onHeadersRequest(req *spec.BlockHeadersRequest) (iter.Seq[proto.Message], error) {
Expand Down
Loading

0 comments on commit 1a5462f

Please sign in to comment.