Skip to content

Commit

Permalink
core/state/snapshot: refactor (#4)
Browse files Browse the repository at this point in the history
* core/state/snapshot: refactor

* core/state/snapshot: tiny fix and polish

Co-authored-by: rjl493456442 <[email protected]>
  • Loading branch information
holiman and rjl493456442 authored Mar 12, 2021
1 parent 475ad07 commit 955e99d
Showing 1 changed file with 97 additions and 120 deletions.
217 changes: 97 additions & 120 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {
accountRange = accountCheckRange
)
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
accMarker = dl.genMarker[:common.HashLength]

// Always reset the initial account range as 1 whenever recover
// from the interruption.
accountRange = 1
// Always reset the initial account range as 1
// whenever recover from the interruption.
accMarker, accountRange = dl.genMarker[:common.HashLength], 1
}
var (
batch = dl.diskdb.NewBatch()
Expand All @@ -329,132 +327,111 @@ func (dl *diskLayer) generate(stats *generatorStats) {
)
stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker)

for {
exhausted, last, err := dl.genRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, func(key []byte, val []byte, regen bool) error {
// Retrieve the current account and flatten it into the internal format
accountHash := common.BytesToHash(key)

var acc struct {
Nonce uint64
Balance *big.Int
Root common.Hash
CodeHash []byte
checkAndFlush := func(currentLocation []byte) error {
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
marker := currentLocation
journalProgress(batch, marker, stats)

if err := batch.Write(); err != nil {
return err
}
if err := rlp.DecodeBytes(val, &acc); err != nil {
log.Crit("Invalid account encountered during snapshot creation", "err", err)
batch.Reset()

dl.lock.Lock()
dl.genMarker = marker
dl.lock.Unlock()

if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, marker)
return errors.New("aborted")
}
data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)
}
if time.Since(logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, marker)
logged = time.Now()
}
return nil
}

// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) {
onAccount := func(key []byte, val []byte, regen bool) error {
// Retrieve the current account and flatten it into the internal format
accountHash := common.BytesToHash(key)
var acc struct {
Nonce uint64
Balance *big.Int
Root common.Hash
CodeHash []byte
}
if err := rlp.DecodeBytes(val, &acc); err != nil {
log.Crit("Invalid account encountered during snapshot creation", "err", err)
}
data := SlimAccountRLP(acc.Nonce, acc.Balance, acc.Root, acc.CodeHash)

// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(accountHash[:], accMarker) {
if regen {
rawdb.WriteAccountSnapshot(batch, accountHash, data)
snapGeneratedAccountMeter.Mark(1)
} else {
snapRecoveredAccountMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + common.HashLength + len(data))
stats.accounts++
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(accountHash[:]); err != nil {
return err
}
// If the iterated account is the contract, create a further loop to
// verify or regenerate the contract storage.
if acc.Root != emptyRoot {
var storeMarker []byte
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
}
onStorage := func(key []byte, val []byte, regen bool) error {
if regen {
rawdb.WriteAccountSnapshot(batch, accountHash, data)
snapGeneratedAccountMeter.Mark(1)
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredAccountMeter.Mark(1)
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + common.HashLength + len(data))
stats.accounts++
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
marker := accountHash[:]
journalProgress(batch, marker, stats)

if err := batch.Write(); err != nil {
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := checkAndFlush(append(accountHash[:], key...)); err != nil {
return err
}
batch.Reset()

dl.lock.Lock()
dl.genMarker = marker
dl.lock.Unlock()

if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, accountHash[:])
return errors.New("aborted")
}
return nil
}
// If the iterated account is the contract, create a further loop to
// verify or regenerate the contract storage.
if acc.Root != emptyRoot {
var storeMarker []byte
if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength {
storeMarker = dl.genMarker[common.HashLength:]
var storeOrigin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.genRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil)
if err != nil {
return err
}
var storeOrigin = common.CopyBytes(storeMarker)
for {
exhausted, last, err := dl.genRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, func(key []byte, val []byte, regen bool) error {
if regen {
rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val)
snapGeneratedStorageMeter.Mark(1)
} else {
snapRecoveredStorageMeter.Mark(1)
}
stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
stats.slots++

// If we've exceeded our batch allowance or termination was requested, flush to disk
select {
case abort = <-dl.genAbort:
default:
}
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Flush out the batch anyway no matter it's empty or not.
// It's possible that all the states are recovered and the
// generation indeed makes progress.
marker := append(accountHash[:], key...)
journalProgress(batch, marker, stats)

if err := batch.Write(); err != nil {
return err
}
batch.Reset()

dl.lock.Lock()
dl.genMarker = marker
dl.lock.Unlock()

if abort != nil {
stats.Log("Aborting state snapshot generation", dl.root, append(accountHash[:], key...))
return errors.New("aborted")
}
if time.Since(logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, append(accountHash[:], key...))
logged = time.Now()
}
}
return nil
}, nil)

if err != nil {
return err
}
if exhausted {
return nil
}
storeOrigin = increseKey(last)
if storeOrigin == nil {
return nil // special case, the last is 0xffffffff...fff
}
if exhausted {
return nil
}
storeOrigin = increseKey(last)
if storeOrigin == nil {
return nil // special case, the last is 0xffffffff...fff
}
}
if time.Since(logged) > 8*time.Second {
stats.Log("Generating state snapshot", dl.root, key)
logged = time.Now()
}
// Some account processed, unmark the marker
accMarker = nil
return nil
}, FullAccountRLP)

}
// Some account processed, unmark the marker
accMarker = nil
return nil
}
for {
exhausted, last, err := dl.genRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP)
// The procedure it aborted, either by external signal or internal error
if err != nil {
if abort == nil { // aborted by internal error, wait the signal
Expand All @@ -480,7 +457,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if err := batch.Write(); err != nil {
log.Error("Failed to flush batch", "error", err)

abort := <-dl.genAbort
abort = <-dl.genAbort
abort <- stats
return
}
Expand Down

0 comments on commit 955e99d

Please sign in to comment.