Skip to content

Commit

Permalink
e3: one more reconst deadlock fix (#7207)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Mar 29, 2023
1 parent 417a437 commit f541f61
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
8 changes: 6 additions & 2 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,8 +1094,12 @@ func reconstituteStep(last bool,

select {
case workCh <- txTask:
case <-ctx.Done():
return ctx.Err()
case <-reconstWorkersCtx.Done():
// if ctx canceled, then maybe it's because of error in errgroup
//
// errgroup doesn't play with pattern where some 1 goroutine-producer is outside of errgroup
// but RwTx doesn't allow move between goroutines
return g.Wait()
}
}
inputTxNum++
Expand Down
17 changes: 8 additions & 9 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,6 @@ func NewBlockRetire(workers int, tmpDir string, snapshots *RoSnapshots, db kv.Ro
return &BlockRetire{workers: workers, tmpDir: tmpDir, snapshots: snapshots, db: db, downloader: downloader, notifier: notifier}
}
func (br *BlockRetire) Snapshots() *RoSnapshots { return br.snapshots }
func (br *BlockRetire) Working() bool { return br.working.Load() }
func (br *BlockRetire) NeedSaveFilesListInDB() bool {
return br.needSaveFilesListInDB.CompareAndSwap(true, false)
}
Expand Down Expand Up @@ -1955,15 +1954,15 @@ func ForEachHeader(ctx context.Context, s *RoSnapshots, walker func(header *type
}

type Merger struct {
lvl log.Lvl
workers int
tmpDir string
chainID uint256.Int
notifier DBEventNotifier
lvl log.Lvl
compressWorkers int
tmpDir string
chainID uint256.Int
notifier DBEventNotifier
}

func NewMerger(tmpDir string, workers int, lvl log.Lvl, chainID uint256.Int, notifier DBEventNotifier) *Merger {
return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl, chainID: chainID, notifier: notifier}
func NewMerger(tmpDir string, compressWorkers int, lvl log.Lvl, chainID uint256.Int, notifier DBEventNotifier) *Merger {
return &Merger{tmpDir: tmpDir, compressWorkers: compressWorkers, lvl: lvl, chainID: chainID, notifier: notifier}
}

type Range struct {
Expand Down Expand Up @@ -2078,7 +2077,7 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string,
expectedTotal += d.Count()
}

f, err := compress.NewCompressor(ctx, "Snapshots merge", targetFile, m.tmpDir, compress.MinPatternScore, m.workers, log.LvlTrace)
f, err := compress.NewCompressor(ctx, "Snapshots merge", targetFile, m.tmpDir, compress.MinPatternScore, m.compressWorkers, log.LvlTrace)
if err != nil {
return err
}
Expand Down

0 comments on commit f541f61

Please sign in to comment.