From 3b6b771169a3eac3266503003dc2204adf604913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20IRMAK?= Date: Tue, 19 Sep 2023 12:54:29 +0300 Subject: [PATCH] Move latest block fetch out of store callback store callback is on the hot path of sync process, a network request is not suitable to be here since it blocks the entire sync process for arbitrarily long --- sync/sync.go | 73 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 20 deletions(-) diff --git a/sync/sync.go b/sync/sync.go index 8e15f32145..9d7b78005e 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -222,17 +222,17 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat return } highestBlockHeader := s.highestBlockHeader.Load() - if highestBlockHeader == nil || highestBlockHeader.Number <= block.Number { - highestBlock, err := s.starknetData.BlockLatest(ctx) - if err != nil { - s.log.Warnw("Failed fetching latest block", "err", err) - } else { - s.highestBlockHeader.Store(highestBlock.Header) - isBehind := highestBlock.Number > block.Number+uint64(maxWorkers()) - if s.catchUpMode != isBehind { - resetStreams() - } - s.catchUpMode = isBehind + if highestBlockHeader != nil { + isBehind := highestBlockHeader.Number > block.Number+uint64(maxWorkers()) + if s.catchUpMode != isBehind { + resetStreams() + } + s.catchUpMode = isBehind + } + + if highestBlockHeader == nil || highestBlockHeader.Number < block.Number { + if s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header) { + s.bestBlockGauge.Set(float64(block.Header.Number)) } } @@ -266,6 +266,8 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) { pendingSem := make(chan struct{}, 1) go s.pollPending(syncCtx, pendingSem) + latestSem := make(chan struct{}, 1) + go s.pollLatest(syncCtx, latestSem) for { select { @@ -277,6 +279,7 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) { select { case <-syncCtx.Done(): pendingSem <- struct{}{} + latestSem <- struct{}{} return default: streamCtx, streamCancel = context.WithCancel(syncCtx) @@ -346,11 +349,13 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) { select { case sem <- struct{}{}: go func() { + defer func() { + <-sem + }() err := s.fetchAndStorePending(ctx) if err != nil { s.log.Debugw("Error while trying to poll pending block", "err", err) } - <-sem }() default: } @@ -358,6 +363,40 @@ func (s *Synchronizer) pollPending(ctx context.Context, sem chan struct{}) { } } +func (s *Synchronizer) pollLatest(ctx context.Context, sem chan struct{}) { + poll := func() { + select { + case sem <- struct{}{}: + go func() { + defer func() { + <-sem + }() + highestBlock, err := s.starknetData.BlockLatest(ctx) + if err != nil { + s.log.Warnw("Failed fetching latest block", "err", err) + } else { + s.highestBlockHeader.Store(highestBlock.Header) + } + s.bestBlockGauge.Set(float64(highestBlock.Header.Number)) + }() + default: + } + } + + ticker := time.NewTicker(time.Minute) + poll() + + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + poll() + } + } +} + func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { highestBlockHeader := s.highestBlockHeader.Load() if highestBlockHeader == nil { @@ -394,18 +433,12 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { func (s *Synchronizer) updateStats(block *core.Block) { var ( - transactions = block.TransactionCount - currentHeight = block.Number - highestKnownHeight uint64 = 0 + transactions = block.TransactionCount + currentHeight = block.Number ) - highestBlockHeader := s.highestBlockHeader.Load() - if highestBlockHeader != nil { - highestKnownHeight = highestBlockHeader.Number - } s.blockCount.Inc() s.chainHeightGauge.Set(float64(currentHeight)) - s.bestBlockGauge.Set(float64(highestKnownHeight)) s.transactionCount.Add(float64(transactions)) }