Skip to content

Commit

Permalink
Fetch random node's height to determine how many blocks to fetch
Browse files Browse the repository at this point in the history
Co-authored-by: Kirill <[email protected]>
  • Loading branch information
IronGauntlets and kirugan committed Jan 25, 2024
1 parent ee3bf94 commit 38b3912
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 72 deletions.
70 changes: 25 additions & 45 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type Service struct {
synchroniser *syncService

feederNode bool

runCtx context.Context
runLock sync.RWMutex
}

func New(addr, userAgent, peers, privKeyStr string, feederNode bool, bc *blockchain.Blockchain, snNetwork utils.Network,
Expand Down Expand Up @@ -104,11 +101,8 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
}

// todo: reconsider initialising synchroniser here because if node is a feedernode we shouldn't not create an instance of it.
var peerId peer.ID
if len(peersAddrInfoS) > 0 {
peerId = peersAddrInfoS[0].ID
}
synchroniser := newSyncService(bc, p2phost, peerId, snNetwork, log)

synchroniser := newSyncService(bc, p2phost, snNetwork, log)
s := &Service{
synchroniser: synchroniser,
log: log,
Expand All @@ -119,7 +113,6 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
topics: make(map[string]*pubsub.Topic),
handler: starknet.NewHandler(bc, log),
}
s.runLock.Lock()
return s, nil
}

Expand Down Expand Up @@ -196,25 +189,16 @@ func (s *Service) SubscribePeerConnectednessChanged(ctx context.Context) (<-chan
func (s *Service) Run(ctx context.Context) error {
defer s.host.Close()

err := func() error {
defer s.runLock.Unlock()

err := s.dht.Bootstrap(ctx)
if err != nil {
return err
}

s.pubsub, err = pubsub.NewGossipSub(ctx, s.host)
if err != nil {
return err
}
err := s.dht.Bootstrap(ctx)
if err != nil {
return err
}

s.runCtx = ctx
return nil
}()
s.pubsub, err = pubsub.NewGossipSub(ctx, s.host)
if err != nil {
return err
}

defer s.callAndLogErr(s.dht.Close, "Failed stopping DHT")

listenAddrs, err := s.ListenAddrs()
Expand All @@ -231,7 +215,7 @@ func (s *Service) Run(ctx context.Context) error {
s.synchroniser.start(ctx)
}

<-s.runCtx.Done()
<-ctx.Done()
if err := s.dht.Close(); err != nil {
s.log.Warnw("Failed stopping DHT", "err", err.Error())
}
Expand Down Expand Up @@ -309,12 +293,6 @@ func (s *Service) joinTopic(topic string) (*pubsub.Topic, error) {
return existingTopic, nil
}

s.runLock.RLock()
defer s.runLock.RUnlock()
if s.runCtx == nil {
return nil, errors.New("uninitialized p2p service")
}

newTopic, err := s.pubsub.Join(topic)
if err != nil {
return nil, err
Expand All @@ -341,20 +319,20 @@ func (s *Service) SubscribeToTopic(topic string) (chan []byte, func(), error) {
ch := make(chan []byte, bufferSize)
go func() {
for {
msg, err := sub.Next(s.runCtx)
if err != nil {
close(ch)
return
}
//msg, err := sub.Next(s.runCtx)
//if err != nil {
// close(ch)
// return
//}
// only forward messages delivered by others
if msg.ReceivedFrom == s.host.ID() {
continue
}

select {
case ch <- msg.GetData():
case <-s.runCtx.Done():
}
//if msg.ReceivedFrom == s.host.ID() {
// continue
//}

//select {
//case ch <- msg.GetData():
//case <-s.runCtx.Done():
//}
}
}()
return ch, sub.Cancel, nil
Expand All @@ -365,8 +343,10 @@ func (s *Service) PublishOnTopic(topic string, data []byte) error {
if joinErr != nil {
return joinErr
}
_ = t

return t.Publish(s.runCtx, data)
return nil
// return t.Publish(s.runCtx, data)
}

func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
Expand Down
48 changes: 21 additions & 27 deletions p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,25 @@ import (
const maxBlocks = 100

type syncService struct {
host host.Host
network utils.Network
feederNode peer.ID
client *starknet.Client // todo: merge all the functionality of Client with p2p SyncService
host host.Host
network utils.Network
client *starknet.Client // todo: merge all the functionality of Client with p2p SyncService

blockchain *blockchain.Blockchain
log utils.SimpleLogger
}

func newSyncService(bc *blockchain.Blockchain, h host.Host, feederNode peer.ID, network utils.Network,
log utils.SimpleLogger,
) *syncService {
func newSyncService(bc *blockchain.Blockchain, h host.Host, network utils.Network, log utils.SimpleLogger) *syncService {
return &syncService{
host: h,
network: network,
blockchain: bc,
log: log,
feederNode: feederNode,
}
}

func (s *syncService) feederNodeHeight(ctx context.Context) (uint64, error) {
c := starknet.NewClient(func(ctx context.Context, pids ...protocol.ID) (network.Stream, error) {
return s.host.NewStream(ctx, s.feederNode, pids...)
}, s.network, s.log)

headersIt, err := c.RequestCurrentBlockHeader(ctx, &spec.CurrentBlockHeaderRequest{})
func (s *syncService) randomNodeHeight(ctx context.Context) (int, error) {
headersIt, err := s.client.RequestCurrentBlockHeader(ctx, &spec.CurrentBlockHeaderRequest{})
if err != nil {
return 0, err
}
Expand All @@ -66,7 +58,7 @@ func (s *syncService) feederNodeHeight(ctx context.Context) (uint64, error) {
}
}

return header.Number, nil
return int(header.Number), nil
}

func (s *syncService) start(ctx context.Context) {
Expand All @@ -75,7 +67,7 @@ func (s *syncService) start(ctx context.Context) {

s.client = starknet.NewClient(s.randomPeerStream, s.network, s.log)

var feederNodeHeight uint64
var randHeight int
for i := 0; ; i++ {
if err := ctx.Err(); err != nil {
break
Expand All @@ -86,32 +78,33 @@ func (s *syncService) start(ctx context.Context) {
ctx, cancelIteration := context.WithCancel(ctx)

var err error
feederNodeHeight, err = s.feederNodeHeight(ctx)
randHeight, err = s.randomNodeHeight(ctx)
if err != nil {
s.logError("Failed to get boot node height", err)
s.logError("Failed to get random node height", err)
cancelIteration()
continue
}

var nextHeight uint64
var nextHeight int
if curHeight, err := s.blockchain.Height(); err == nil { //nolint:govet
nextHeight = curHeight + 1
nextHeight = int(curHeight) + 1
} else if !errors.Is(db.ErrKeyNotFound, err) {
s.log.Errorw("Failed to get current height", "err", err)
}

blockBehind := feederNodeHeight - (nextHeight - 1)
blockBehind := randHeight - (nextHeight - 1)
if blockBehind <= 0 {
s.log.Infow("Bootnode height is the same as local height, retrying in 30s")
time.Sleep(30 * time.Second)
s.log.Infow("Random node height is the same or less as local height, retrying in 100ms", "Random node height", randHeight,
"Current height", nextHeight-1)
time.Sleep(100 * time.Millisecond)
cancelIteration()
continue
}

s.log.Infow("Start Pipeline", "Bootnode height", feederNodeHeight, "Current height", nextHeight-1)
s.log.Infow("Fetching blocks", "Start", nextHeight, "End", nextHeight+min(blockBehind, maxBlocks))
s.log.Infow("Start Pipeline", "Random node height", randHeight, "Current height", nextHeight-1, "Start", nextHeight, "End",
nextHeight+min(blockBehind, maxBlocks))

commonIt := s.createIterator(nextHeight, min(blockBehind, maxBlocks))
commonIt := s.createIterator(uint64(nextHeight), uint64(min(blockBehind, maxBlocks)))
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, commonIt)
if err != nil {
s.logError("Failed to get block headers parts", err)
Expand Down Expand Up @@ -147,7 +140,7 @@ func (s *syncService) start(ctx context.Context) {
continue
}

blocksCh := pipeline.Bridge(ctx, s.processSpecBlockParts(ctx, nextHeight, pipeline.FanIn(ctx,
blocksCh := pipeline.Bridge(ctx, s.processSpecBlockParts(ctx, uint64(nextHeight), pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]),
pipeline.Stage(ctx, blockBodiesCh, specBlockPartsFunc[specBlockBody]),
pipeline.Stage(ctx, txsCh, specBlockPartsFunc[specTransactions]),
Expand Down Expand Up @@ -602,6 +595,7 @@ func (s *syncService) randomPeer() peer.ID {
peer := peers[rand.Intn(len(peers))]

fmt.Println("Number of peers", len(peers))
// Random chosen peer's Info {12D3KooWBejoxD2ivkPjRYhD887XXdCL9o6uAYr196BWJac36uzo: []}
fmt.Println("Random chosen peer's Info", s.host.Peerstore().PeerInfo(peer))

return peer
Expand Down

0 comments on commit 38b3912

Please sign in to comment.