Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused code #2317

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 2 additions & 99 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/Masterminds/semver/v3"
Expand All @@ -21,7 +20,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/crypto/pb"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -43,10 +41,8 @@ type Service struct {
handler *starknet.Handler
log utils.SimpleLogger

dht *dht.IpfsDHT
pubsub *pubsub.PubSub
topics map[string]*pubsub.Topic
topicsLock sync.RWMutex
kirugan marked this conversation as resolved.
Show resolved Hide resolved
dht *dht.IpfsDHT
pubsub *pubsub.PubSub

synchroniser *syncService
gossipTracer *gossipTracer
Expand Down Expand Up @@ -157,7 +153,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
network: snNetwork,
dht: p2pdht,
feederNode: feederNode,
topics: make(map[string]*pubsub.Topic),
handler: starknet.NewHandler(bc, log),
database: database,
}
Expand Down Expand Up @@ -204,34 +199,6 @@ func privateKey(privKeyStr string) (crypto.PrivKey, error) {
return prvKey, nil
}

func (s *Service) SubscribePeerConnectednessChanged(ctx context.Context) (<-chan event.EvtPeerConnectednessChanged, error) {
ch := make(chan event.EvtPeerConnectednessChanged)
sub, err := s.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}

go func() {
for {
select {
case <-ctx.Done():
if err = sub.Close(); err != nil {
s.log.Warnw("Failed to close subscription", "err", err)
}
close(ch)
return
case evnt := <-sub.Out():
typedEvnt := evnt.(event.EvtPeerConnectednessChanged)
if typedEvnt.Connectedness == network.Connected {
ch <- typedEvnt
}
}
}
}()

return ch, nil
}

// Run starts the p2p service. Calling any other function before run is undefined behaviour
func (s *Service) Run(ctx context.Context) error {
defer func() {
Expand Down Expand Up @@ -336,70 +303,6 @@ func (s *Service) NewStream(ctx context.Context, pids ...protocol.ID) (network.S
}
}

func (s *Service) joinTopic(topic string) (*pubsub.Topic, error) {
existingTopic := func() *pubsub.Topic {
s.topicsLock.RLock()
defer s.topicsLock.RUnlock()
if t, found := s.topics[topic]; found {
return t
}
return nil
}()

if existingTopic != nil {
return existingTopic, nil
}

newTopic, err := s.pubsub.Join(topic)
if err != nil {
return nil, err
}

s.topicsLock.Lock()
defer s.topicsLock.Unlock()
s.topics[topic] = newTopic
return newTopic, nil
}

func (s *Service) SubscribeToTopic(topic string) (chan []byte, func(), error) {
t, joinErr := s.joinTopic(topic)
if joinErr != nil {
return nil, nil, joinErr
}

sub, subErr := t.Subscribe()
if subErr != nil {
return nil, nil, subErr
}

const bufferSize = 16
ch := make(chan []byte, bufferSize)
// go func() {
// for {
// msg, err := sub.Next(s.runCtx)
// if err != nil {
// close(ch)
// return
// }
// only forward messages delivered by others
// if msg.ReceivedFrom == s.host.ID() {
// continue
// }
//
// select {
// case ch <- msg.GetData():
// case <-s.runCtx.Done():
// }
// }
// }()
return ch, sub.Cancel, nil
}

func (s *Service) PublishOnTopic(topic string) error {
_, err := s.joinTopic(topic)
return err
}

func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
s.host.SetStreamHandler(pid, handler)
}
Expand Down
142 changes: 0 additions & 142 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
@@ -1,141 +1,17 @@
package p2p_test

import (
"context"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/p2p"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestService(t *testing.T) {
t.Skip("TestService")
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
peerHosts := net.Hosts()
require.Len(t, peerHosts, 2)

timeout := time.Second
testCtx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
peerA, err := p2p.NewWithHost(
peerHosts[0],
"",
false,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

events, err := peerA.SubscribePeerConnectednessChanged(testCtx)
require.NoError(t, err)

peerAddrs, err := peerA.ListenAddrs()
require.NoError(t, err)

peerAddrsString := make([]string, 0, len(peerAddrs))
for _, addr := range peerAddrs {
peerAddrsString = append(peerAddrsString, addr.String())
}

peerB, err := p2p.NewWithHost(
peerHosts[1],
strings.Join(peerAddrsString, ","),
true,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
require.NoError(t, peerA.Run(testCtx))
}()
go func() {
defer wg.Done()
require.NoError(t, peerB.Run(testCtx))
}()

select {
case evt := <-events:
require.Equal(t, network.Connected, evt.Connectedness)
case <-time.After(timeout):
require.True(t, false, "no events were emitted")
}

t.Run("gossip", func(t *testing.T) {
t.Skip() // todo: flaky test
topic := "coolTopic"
ch, closer, err := peerA.SubscribeToTopic(topic)
require.NoError(t, err)
t.Cleanup(closer)

maxRetries := 4
RetryLoop:
for i := 0; i < maxRetries; i++ {
gossipedMessage := []byte(`veryImportantMessage`)
require.NoError(t, peerB.PublishOnTopic(topic))

select {
case <-time.After(time.Second):
if i == maxRetries-1 {
require.Fail(t, "timeout: never received the message")
}
case msg := <-ch:
require.Equal(t, gossipedMessage, msg)
break RetryLoop
}
}
})

t.Run("protocol handler", func(t *testing.T) {
ch := make(chan []byte)

superSecretProtocol := protocol.ID("superSecretProtocol")
peerA.SetProtocolHandler(superSecretProtocol, func(stream network.Stream) {
read, err := io.ReadAll(stream)
require.NoError(t, err)
ch <- read
})

peerAStream, err := peerB.NewStream(testCtx, superSecretProtocol)
require.NoError(t, err)

superSecretMessage := []byte(`superSecretMessage`)
_, err = peerAStream.Write(superSecretMessage)
require.NoError(t, err)
require.NoError(t, peerAStream.Close())

select {
case <-time.After(timeout):
require.Equal(t, true, false)
case msg := <-ch:
require.Equal(t, superSecretMessage, msg)
}
})

cancel()
wg.Wait()
}

func TestInvalidKey(t *testing.T) {
_, err := p2p.New(
"/ip4/127.0.0.1/tcp/30301",
Expand All @@ -153,24 +29,6 @@ func TestInvalidKey(t *testing.T) {
require.Error(t, err)
}

func TestValidKey(t *testing.T) {
t.Skip("TestValidKey")
_, err := p2p.New(
"/ip4/127.0.0.1/tcp/30301",
"",
"peerA",
"",
"08011240333b4a433f16d7ca225c0e99d0d8c437b835cb74a98d9279c561977690c80f681b25ccf3fa45e2f2de260149c112fa516b69057dd3b0151a879416c0cb12d9b3",
false,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)

require.NoError(t, err)
}

func TestLoadAndPersistPeers(t *testing.T) {
testDB := pebble.NewMemTest(t)

Expand Down
10 changes: 5 additions & 5 deletions p2p/starknet/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ func streamHandler[ReqT proto.Message](ctx context.Context, wg *sync.WaitGroup,
}

func (h *Handler) HeadersHandler(stream network.Stream) {
streamHandler[*spec.BlockHeadersRequest](h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onHeadersRequest, h.log)
}

func (h *Handler) EventsHandler(stream network.Stream) {
streamHandler[*spec.EventsRequest](h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onEventsRequest, h.log)
}

func (h *Handler) TransactionsHandler(stream network.Stream) {
streamHandler[*spec.TransactionsRequest](h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onTransactionsRequest, h.log)
}

func (h *Handler) ClassesHandler(stream network.Stream) {
streamHandler[*spec.ClassesRequest](h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onClassesRequest, h.log)
}

func (h *Handler) StateDiffHandler(stream network.Stream) {
streamHandler[*spec.StateDiffsRequest](h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
streamHandler(h.ctx, &h.wg, stream, h.onStateDiffRequest, h.log)
}

func (h *Handler) onHeadersRequest(req *spec.BlockHeadersRequest) (iter.Seq[proto.Message], error) {
Expand Down
Loading
Loading