From 955e99d0f0161bfcce4de8ecc43e90f0bd04fae0 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 12 Mar 2021 04:54:28 +0100 Subject: [PATCH] core/state/snapshot: refactor (#4) * core/state/snapshot: refactor * core/state/snapshot: tiny fix and polish Co-authored-by: rjl493456442 --- core/state/snapshot/generate.go | 217 ++++++++++++++------------------ 1 file changed, 97 insertions(+), 120 deletions(-) diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 04df0523f6b5..5a6790bd0e5d 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -315,11 +315,9 @@ func (dl *diskLayer) generate(stats *generatorStats) { accountRange = accountCheckRange ) if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that - accMarker = dl.genMarker[:common.HashLength] - - // Always reset the initial account range as 1 whenever recover - // from the interruption. - accountRange = 1 + // Always reset the initial account range as 1 + // whenever recover from the interruption. + accMarker, accountRange = dl.genMarker[:common.HashLength], 1 } var ( batch = dl.diskdb.NewBatch() @@ -329,132 +327,111 @@ func (dl *diskLayer) generate(stats *generatorStats) { ) stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) - for { - exhausted, last, err := dl.genRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, func(key []byte, val []byte, regen bool) error { - // Retrieve the current account and flatten it into the internal format - accountHash := common.BytesToHash(key) - - var acc struct { - Nonce uint64 - Balance *big.Int - Root common.Hash - CodeHash []byte + checkAndFlush := func(currentLocation []byte) error { + select { + case abort = <-dl.genAbort: + default: + } + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + // Flush out the batch anyway no matter it's empty or not. + // It's possible that all the states are recovered and the + // generation indeed makes progress. + marker := currentLocation + journalProgress(batch, marker, stats) + + if err := batch.Write(); err != nil { + return err } - if err := rlp.DecodeBytes(val, &acc); err != nil { - log.Crit("Invalid account encountered during snapshot creation", "err", err) + batch.Reset() + + dl.lock.Lock() + dl.genMarker = marker + dl.lock.Unlock() + + if abort != nil { + stats.Log("Aborting state snapshot generation", dl.root, marker) + return errors.New("aborted") } - data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash) + } + if time.Since(logged) > 8*time.Second { + stats.Log("Generating state snapshot", dl.root, marker) + logged = time.Now() + } + return nil + } - // If the account is not yet in-progress, write it out - if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) { + onAccount := func(key []byte, val []byte, regen bool) error { + // Retrieve the current account and flatten it into the internal format + accountHash := common.BytesToHash(key) + var acc struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + CodeHash []byte + } + if err := rlp.DecodeBytes(val, &acc); err != nil { + log.Crit("Invalid account encountered during snapshot creation", "err", err) + } + data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash) + + // If the account is not yet in-progress, write it out + if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) { + if regen { + rawdb.WriteAccountSnapshot(batch, accountHash, data) + snapGeneratedAccountMeter.Mark(1) + } else { + snapRecoveredAccountMeter.Mark(1) + } + stats.storage += common.StorageSize(1 + common.HashLength + len(data)) + stats.accounts++ + } + // If we've exceeded our batch allowance or termination was requested, flush to disk + if err := checkAndFlush(accountHash[:]); err != nil { + return err + } + // If the iterated account is the contract, create a further loop to + // verify or regenerate the contract storage. + if acc.Root != emptyRoot { + var storeMarker []byte + if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { + storeMarker = dl.genMarker[common.HashLength:] + } + onStorage := func(key []byte, val []byte, regen bool) error { if regen { - rawdb.WriteAccountSnapshot(batch, accountHash, data) - snapGeneratedAccountMeter.Mark(1) + rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val) + snapGeneratedStorageMeter.Mark(1) } else { - snapRecoveredAccountMeter.Mark(1) + snapRecoveredStorageMeter.Mark(1) } - stats.storage += common.StorageSize(1 + common.HashLength + len(data)) - stats.accounts++ - } - // If we've exceeded our batch allowance or termination was requested, flush to disk - select { - case abort = <-dl.genAbort: - default: - } - if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { - // Flush out the batch anyway no matter it's empty or not. - // It's possible that all the states are recovered and the - // generation indeed makes progress. - marker := accountHash[:] - journalProgress(batch, marker, stats) - - if err := batch.Write(); err != nil { + stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) + stats.slots++ + // If we've exceeded our batch allowance or termination was requested, flush to disk + if err := checkAndFlush(append(accountHash[:], key...)); err != nil { return err } - batch.Reset() - - dl.lock.Lock() - dl.genMarker = marker - dl.lock.Unlock() - - if abort != nil { - stats.Log("Aborting state snapshot generation", dl.root, accountHash[:]) - return errors.New("aborted") - } + return nil } - // If the iterated account is the contract, create a further loop to - // verify or regenerate the contract storage. - if acc.Root != emptyRoot { - var storeMarker []byte - if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { - storeMarker = dl.genMarker[common.HashLength:] + var storeOrigin = common.CopyBytes(storeMarker) + for { + exhausted, last, err := dl.genRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil) + if err != nil { + return err } - var storeOrigin = common.CopyBytes(storeMarker) - for { - exhausted, last, err := dl.genRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, func(key []byte, val []byte, regen bool) error { - if regen { - rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val) - snapGeneratedStorageMeter.Mark(1) - } else { - snapRecoveredStorageMeter.Mark(1) - } - stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) - stats.slots++ - - // If we've exceeded our batch allowance or termination was requested, flush to disk - select { - case abort = <-dl.genAbort: - default: - } - if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { - // Flush out the batch anyway no matter it's empty or not. - // It's possible that all the states are recovered and the - // generation indeed makes progress. - marker := append(accountHash[:], key...) - journalProgress(batch, marker, stats) - - if err := batch.Write(); err != nil { - return err - } - batch.Reset() - - dl.lock.Lock() - dl.genMarker = marker - dl.lock.Unlock() - - if abort != nil { - stats.Log("Aborting state snapshot generation", dl.root, append(accountHash[:], key...)) - return errors.New("aborted") - } - if time.Since(logged) > 8*time.Second { - stats.Log("Generating state snapshot", dl.root, append(accountHash[:], key...)) - logged = time.Now() - } - } - return nil - }, nil) - - if err != nil { - return err - } - if exhausted { - return nil - } - storeOrigin = increseKey(last) - if storeOrigin == nil { - return nil // special case, the last is 0xffffffff...fff - } + if exhausted { + return nil + } + storeOrigin = increseKey(last) + if storeOrigin == nil { + return nil // special case, the last is 0xffffffff...fff } } - if time.Since(logged) > 8*time.Second { - stats.Log("Generating state snapshot", dl.root, key) - logged = time.Now() - } - // Some account processed, unmark the marker - accMarker = nil - return nil - }, FullAccountRLP) - + } + // Some account processed, unmark the marker + accMarker = nil + return nil + } + for { + exhausted, last, err := dl.genRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP) // The procedure it aborted, either by external signal or internal error if err != nil { if abort == nil { // aborted by internal error, wait the signal @@ -480,7 +457,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { if err := batch.Write(); err != nil { log.Error("Failed to flush batch", "error", err) - abort := <-dl.genAbort + abort = <-dl.genAbort abort <- stats return }