Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More efficient header verification of headers for Parlia when snapshots are used #3998

Merged
merged 15 commits into from
Apr 29, 2022
50 changes: 25 additions & 25 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,24 +1149,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
}
vmConfig := &vm.Config{}

genesis, chainConfig := byChain(chain)
var engine consensus.Engine
config := &ethconfig.Defaults
if chainConfig.Clique != nil {
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Aura != nil {
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Parlia != nil {
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
} else if chainConfig.Bor != nil {
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir)
} else { //ethash
engine = ethash.NewFaker()
}
genesis, _ := byChain(chain)

events := privateapi.NewEvents()

Expand All @@ -1184,13 +1167,6 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}

cfg := ethconfig.Defaults
cfg.Prune = pm
cfg.BatchSize = batchSize
Expand All @@ -1206,6 +1182,30 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
snDir := &dir.Rw{Path: filepath.Join(datadir, "snapshots")}
cfg.SnapshotDir = snDir
}
var engine consensus.Engine
config := &ethconfig.Defaults
if chainConfig.Clique != nil {
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Aura != nil {
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Parlia != nil {
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, allSn)
} else if chainConfig.Bor != nil {
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, HeimdallURL, false, datadir, allSn)
} else { //ethash
engine = ethash.NewFaker()
}

br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}

sync, err := stages2.NewStagedSync(context.Background(), logger, db, p2p.Config{}, cfg,
chainConfig.TerminalTotalDifficulty, sentryControlServer, tmpdir,
Expand Down
15 changes: 8 additions & 7 deletions cmd/state/commands/erigon2.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,17 @@ func Erigon2(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.
}
}()

engine := initConsensusEngine(chainConfig, logger)
var blockReader interfaces.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
syncMode := ethconfig.SyncModeByChainName(chainConfig.ChainName, syncmodeCli)
if syncMode == ethconfig.SnapSync {
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapshotCfg(true, false), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
engine := initConsensusEngine(chainConfig, logger, allSnapshots)

for !interrupt {
blockNum++
Expand Down Expand Up @@ -598,23 +599,23 @@ func (ww *WriterWrapper) CreateContract(address common.Address) error {
return nil
}

func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger) (engine consensus.Engine) {
func initConsensusEngine(chainConfig *params.ChainConfig, logger log.Logger, snapshots *snapshotsync.RoSnapshots) (engine consensus.Engine) {
config := ethconfig.Defaults

switch {
case chainConfig.Clique != nil:
c := params.CliqueSnapshot
c.DBPath = filepath.Join(datadir, "clique", "db")
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, c, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Aura != nil:
consensusConfig := &params.AuRaConfig{DBPath: filepath.Join(datadir, "aura")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Parlia != nil:
consensusConfig := &params.ParliaConfig{DBPath: filepath.Join(datadir, "parlia")}
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "", true, datadir, snapshots)
case chainConfig.Bor != nil:
consensusConfig := &config.Bor
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "http://localhost:1317", false, datadir)
engine = ethconsensusconfig.CreateConsensusEngine(chainConfig, logger, consensusConfig, config.Miner.Notify, config.Miner.Noverify, "http://localhost:1317", false, datadir, snapshots)
default: //ethash
engine = ethash.NewFaker()
}
Expand Down
8 changes: 4 additions & 4 deletions consensus/parlia/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (api *API) GetSnapshot(number *rpc.BlockNumber) (*Snapshot, error) {
if header == nil {
return nil, errUnknownBlock
}
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
}

// GetSnapshotAtHash retrieves the state snapshot at a given block.
Expand All @@ -51,7 +51,7 @@ func (api *API) GetSnapshotAtHash(hash common.Hash) (*Snapshot, error) {
if header == nil {
return nil, errUnknownBlock
}
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
return api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
}

// GetValidators retrieves the list of validators at the specified block.
Expand All @@ -67,7 +67,7 @@ func (api *API) GetValidators(number *rpc.BlockNumber) ([]common.Address, error)
if header == nil {
return nil, errUnknownBlock
}
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
if err != nil {
return nil, err
}
Expand All @@ -80,7 +80,7 @@ func (api *API) GetValidatorsAtHash(hash common.Hash) ([]common.Address, error)
if header == nil {
return nil, errUnknownBlock
}
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil)
snap, err := api.parlia.snapshot(api.chain, header.Number.Uint64(), header.Hash(), nil, false /* verify */)
if err != nil {
return nil, err
}
Expand Down
66 changes: 32 additions & 34 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)

const (
Expand Down Expand Up @@ -232,14 +233,16 @@ type Parlia struct {
slashABI abi.ABI

// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
forks []uint64 // Forks extracted from the chainConfig
fakeDiff bool // Skip difficulty verifications
forks []uint64 // Forks extracted from the chainConfig
snapshots *snapshotsync.RoSnapshots
}

// New creates a Parlia consensus engine.
func New(
chainConfig *params.ChainConfig,
db kv.RwDB,
snapshots *snapshotsync.RoSnapshots,
) *Parlia {
// get parlia config
parliaConfig := chainConfig.Parlia
Expand Down Expand Up @@ -276,6 +279,7 @@ func New(
slashABI: sABI,
signer: types.LatestSigner(chainConfig),
forks: forkid.GatherForks(chainConfig),
snapshots: snapshots,
}

return c
Expand Down Expand Up @@ -392,7 +396,7 @@ func (p *Parlia) verifyCascadingFields(chain consensus.ChainHeaderReader, header
return consensus.ErrUnknownAncestor
}

snap, err := p.snapshot(chain, number-1, header.ParentHash, parents)
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -438,7 +442,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea
return errUnknownBlock
}
// Retrieve the snapshot needed to verify this header and cache it
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents)
snap, err := p.snapshot(chain, number-1, header.ParentHash, parents, true /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -481,7 +485,7 @@ func (p *Parlia) verifySeal(chain consensus.ChainHeaderReader, header *types.Hea
}

// snapshot retrieves the authorization snapshot at a given point in time.
func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) {
func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash common.Hash, parents []*types.Header, verify bool) (*Snapshot, error) {
// Search for a snapshot in memory or on disk for checkpoints
var (
headers []*types.Header
Expand All @@ -500,31 +504,26 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
if s, err := loadSnapshot(p.config, p.signatures, p.db, number, hash); err == nil {
log.Trace("Loaded snapshot from disk", "number", number, "hash", hash)
snap = s
break
if !verify || snap != nil {
break
}
}
}

// If we're at the genesis, snapshot the initial state.
if number == 0 {
checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil {
// get checkpoint data
hash := checkpoint.Hash()

validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
if (verify && number%p.config.Epoch == 0) || number == 0 {
if (p.snapshots != nil && number <= p.snapshots.BlocksAvailable()) || number == 0 {
// Headers included into the snapshots have to be trusted as checkpoints
checkpoint := chain.GetHeader(hash, number)
if checkpoint != nil {
validatorBytes := checkpoint.Extra[extraVanity : len(checkpoint.Extra)-extraSeal]
// get validators from headers
validators, err := ParseValidators(validatorBytes)
if err != nil {
return nil, err
}
// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
break
}

// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators)
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
break
}
}

Expand Down Expand Up @@ -557,7 +556,6 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
for i := 0; i < len(headers)/2; i++ {
headers[i], headers[len(headers)-1-i] = headers[len(headers)-1-i], headers[i]
}

snap, err := snap.apply(headers, chain, parents, p.chainConfig.ChainID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -590,7 +588,7 @@ func (p *Parlia) Prepare(chain consensus.ChainHeaderReader, header *types.Header
header.Nonce = types.BlockNonce{}

number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -685,7 +683,7 @@ func (p *Parlia) finalize(header *types.Header, state *state.IntraBlockState, tx
txs = userTxs
// warn if not in majority fork
number := header.Number.Uint64()
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -805,7 +803,7 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res
val, signFn := p.val, p.signFn
p.lock.RUnlock()

snap, err := p.snapshot(chain, number-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, number-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return err
}
Expand Down Expand Up @@ -875,7 +873,7 @@ func (p *Parlia) SealHash(header *types.Header) common.Hash {
// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have.
func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time, parentTime uint64, parentDifficulty *big.Int, parentNumber uint64, parentHash, parentUncleHash common.Hash, parentSeal []rlp.RawValue) *big.Int {
snap, err := p.snapshot(chain, parentNumber, parentHash, nil)
snap, err := p.snapshot(chain, parentNumber, parentHash, nil, false /* verify */)
if err != nil {
return nil
}
Expand Down Expand Up @@ -950,7 +948,7 @@ func (p *Parlia) shouldWaitForCurrentBlockProcess(chain consensus.ChainHeaderRea
}

func (p *Parlia) EnoughDistance(chain consensus.ChainReader, header *types.Header) bool {
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil)
snap, err := p.snapshot(chain, header.Number.Uint64()-1, header.ParentHash, nil, false /* verify */)
if err != nil {
return true
}
Expand All @@ -962,7 +960,7 @@ func (p *Parlia) IsLocalBlock(header *types.Header) bool {
}

func (p *Parlia) AllowLightProcess(chain consensus.ChainReader, currentHeader *types.Header) bool {
snap, err := p.snapshot(chain, currentHeader.Number.Uint64()-1, currentHeader.ParentHash, nil)
snap, err := p.snapshot(chain, currentHeader.Number.Uint64()-1, currentHeader.ParentHash, nil, false /* verify */)
if err != nil {
return true
}
Expand Down
Loading