Skip to content

Commit

Permalink
Move latest block fetch out of store callback
Browse files Browse the repository at this point in the history
store callback is on the hot path of sync process, a network
request is not suitable to be here since it blocks the entire sync
process for arbitrarily long
  • Loading branch information
omerfirmak committed Sep 19, 2023
1 parent e4491c3 commit f9b6ff9
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,12 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
return
}
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil || highestBlockHeader.Number <= block.Number {
highestBlock, err := s.starknetData.BlockLatest(ctx)
if err != nil {
s.log.Warnw("Failed fetching latest block", "err", err)
} else {
s.highestBlockHeader.Store(highestBlock.Header)
isBehind := highestBlock.Number > block.Number+uint64(maxWorkers())
if s.catchUpMode != isBehind {
resetStreams()
}
s.catchUpMode = isBehind
if highestBlockHeader != nil {
isBehind := highestBlockHeader.Number > block.Number+uint64(maxWorkers())
if s.catchUpMode != isBehind {
resetStreams()
}
s.catchUpMode = isBehind
}

s.log.Infow("Stored Block", "number", block.Number, "hash",
Expand Down Expand Up @@ -266,6 +260,8 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {

pendingSem := make(chan struct{}, 1)
go s.pollPending(syncCtx, pendingSem)
latestSem := make(chan struct{}, 1)
go s.pollLatest(syncCtx, latestSem)

for {
select {
Expand All @@ -277,6 +273,7 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
select {
case <-syncCtx.Done():
pendingSem <- struct{}{}
latestSem <- struct{}{}
return
default:
streamCtx, streamCancel = context.WithCancel(syncCtx)
Expand Down Expand Up @@ -358,6 +355,37 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) {
}
}

func (s *Synchronizer) pollLatest(ctx context.Context, sem chan struct{}) {
poll := func() {
select {
case sem <- struct{}{}:
go func() {
highestBlock, err := s.starknetData.BlockLatest(ctx)
if err != nil {
s.log.Warnw("Failed fetching latest block", "err", err)
} else {
s.highestBlockHeader.Store(highestBlock.Header)
}
<-sem
}()
default:
}
}

ticker := time.NewTicker(time.Minute)
poll()

for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
poll()
}
}
}

func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error {
highestBlockHeader := s.highestBlockHeader.Load()
if highestBlockHeader == nil {
Expand Down

0 comments on commit f9b6ff9

Please sign in to comment.