Skip to content

Commit

Permalink
Refactor syncService.start method in p2p (#2174)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkushinDaniil authored Sep 25, 2024
1 parent 38638ca commit b240aba
Showing 1 changed file with 62 additions and 60 deletions.
122 changes: 62 additions & 60 deletions p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func newSyncService(bc *blockchain.Blockchain, h host.Host, n *utils.Network, lo
}
}

//nolint:funlen
func (s *syncService) start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -60,83 +59,86 @@ func (s *syncService) start(ctx context.Context) {
s.log.Debugw("Continuous iteration", "i", i)

iterCtx, cancelIteration := context.WithCancel(ctx)

var nextHeight int
if curHeight, err := s.blockchain.Height(); err == nil {
nextHeight = int(curHeight) + 1
} else if !errors.Is(err, db.ErrKeyNotFound) {
s.log.Errorw("Failed to get current height", "err", err)
nextHeight, err := s.getNextHeight()
if err != nil {
s.logError("Failed to get current height", err)
cancelIteration()
continue
}

s.log.Infow("Start Pipeline", "Current height", nextHeight-1, "Start", nextHeight)

// todo change iteration to fetch several objects uint64(min(blockBehind, maxBlocks))
blockNumber := uint64(nextHeight)
headersAndSigsCh, err := s.genHeadersAndSigs(iterCtx, blockNumber)
if err != nil {
s.logError("Failed to get block headers parts", err)
if err := s.processBlock(iterCtx, blockNumber); err != nil {
s.logError("Failed to process block", fmt.Errorf("blockNumber: %d, err: %w", blockNumber, err))
cancelIteration()
continue
}

txsCh, err := s.genTransactions(iterCtx, blockNumber)
if err != nil {
s.logError("Failed to get transactions", err)
cancelIteration()
continue
}
cancelIteration()
}
}

eventsCh, err := s.genEvents(iterCtx, blockNumber)
if err != nil {
s.logError("Failed to get classes", err)
cancelIteration()
continue
}
func (s *syncService) getNextHeight() (int, error) {
curHeight, err := s.blockchain.Height()
if err == nil {
return int(curHeight) + 1, nil
} else if errors.Is(err, db.ErrKeyNotFound) {
return 0, nil
}
return 0, err
}

classesCh, err := s.genClasses(iterCtx, blockNumber)
if err != nil {
s.logError("Failed to get classes", err)
cancelIteration()
continue
}
func (s *syncService) processBlock(ctx context.Context, blockNumber uint64) error {
headersAndSigsCh, err := s.genHeadersAndSigs(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get block headers parts: %w", err)
}

stateDiffsCh, err := s.genStateDiffs(iterCtx, blockNumber)
if err != nil {
s.logError("Failed to get state diffs", err)
cancelIteration()
continue
}
txsCh, err := s.genTransactions(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get transactions: %w", err)
}

blocksCh := pipeline.Bridge(iterCtx, s.processSpecBlockParts(iterCtx, uint64(nextHeight), pipeline.FanIn(iterCtx,
pipeline.Stage(iterCtx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]),
pipeline.Stage(iterCtx, classesCh, specBlockPartsFunc[specClasses]),
pipeline.Stage(iterCtx, stateDiffsCh, specBlockPartsFunc[specContractDiffs]),
pipeline.Stage(iterCtx, txsCh, specBlockPartsFunc[specTxWithReceipts]),
pipeline.Stage(iterCtx, eventsCh, specBlockPartsFunc[specEvents]),
)))

for b := range blocksCh {
if b.err != nil {
// cannot process any more blocks
s.log.Errorw("Failed to process block", "err", b.err)
cancelIteration()
break
}
eventsCh, err := s.genEvents(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get events: %w", err)
}

storeTimer := time.Now()
err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
cancelIteration()
break
}
classesCh, err := s.genClasses(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get classes: %w", err)
}

s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(),
"root", b.block.GlobalStateRoot.ShortString())
s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer))
stateDiffsCh, err := s.genStateDiffs(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get state diffs: %w", err)
}

blocksCh := pipeline.Bridge(ctx, s.processSpecBlockParts(ctx, blockNumber, pipeline.FanIn(ctx,
pipeline.Stage(ctx, headersAndSigsCh, specBlockPartsFunc[specBlockHeaderAndSigs]),
pipeline.Stage(ctx, classesCh, specBlockPartsFunc[specClasses]),
pipeline.Stage(ctx, stateDiffsCh, specBlockPartsFunc[specContractDiffs]),
pipeline.Stage(ctx, txsCh, specBlockPartsFunc[specTxWithReceipts]),
pipeline.Stage(ctx, eventsCh, specBlockPartsFunc[specEvents]),
)))

for b := range blocksCh {
if b.err != nil {
return fmt.Errorf("failed to process block: %w", b.err)
}
cancelIteration()

storeTimer := time.Now()
if err := s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses); err != nil {
return fmt.Errorf("failed to store block: %w", err)
}

s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(),
"root", b.block.GlobalStateRoot.ShortString())
s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer))
}
return nil
}

func specBlockPartsFunc[T specBlockHeaderAndSigs | specTxWithReceipts | specEvents | specClasses | specContractDiffs](i T) specBlockParts {
Expand Down

0 comments on commit b240aba

Please sign in to comment.