Skip to content

Commit

Permalink
E3: Added Post-Forkchoice for in-parrallel post-processing (#10453)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored May 24, 2024
1 parent 28aa3ef commit 89fc131
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 18 deletions.
31 changes: 31 additions & 0 deletions turbo/engineapi/engine_helpers/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"

Expand All @@ -36,6 +37,8 @@ import (
"github.com/ledgerwatch/log/v3"
)

const timingsCacheSize = 16

// the maximum point from the current head, past which side forks are not validated anymore.
const maxForkDepth = 32 // 32 slots is the duration of an epoch thus there cannot be side forks in PoS deeper than 32 blocks from head.

Expand All @@ -62,16 +65,23 @@ type ForkValidator struct {

// we want fork validator to be thread safe so let
lock sync.Mutex

timingsCache *lru.Cache[libcommon.Hash, []interface{}]
}

func NewForkValidatorMock(currentHeight uint64) *ForkValidator {
validHashes, err := lru.New[libcommon.Hash, bool]("validHashes", maxForkDepth*8)
if err != nil {
panic(err)
}
timingsCache, err := lru.New[libcommon.Hash, []interface{}]("timingsCache", timingsCacheSize)
if err != nil {
panic(err)
}
return &ForkValidator{
currentHeight: currentHeight,
validHashes: validHashes,
timingsCache: timingsCache,
}
}

Expand All @@ -80,13 +90,19 @@ func NewForkValidator(ctx context.Context, currentHeight uint64, validatePayload
if err != nil {
panic(err)
}

timingsCache, err := lru.New[libcommon.Hash, []interface{}]("timingsCache", timingsCacheSize)
if err != nil {
panic(err)
}
return &ForkValidator{
validatePayload: validatePayload,
currentHeight: currentHeight,
tmpDir: tmpDir,
blockReader: blockReader,
ctx: ctx,
validHashes: validHashes,
timingsCache: timingsCache,
}
}

Expand Down Expand Up @@ -116,10 +132,13 @@ func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
func (fv *ForkValidator) FlushExtendingFork(tx kv.RwTx, accumulator *shards.Accumulator) error {
fv.lock.Lock()
defer fv.lock.Unlock()
start := time.Now()
// Flush changes to db.
if err := fv.memoryDiff.Flush(tx); err != nil {
return err
}
timings, _ := fv.timingsCache.Get(fv.extendingForkHeadHash)
fv.timingsCache.Add(fv.extendingForkHeadHash, append(timings, "FlushExtendingFork", time.Since(start)))
fv.extendingForkNotifications.Accumulator.CopyAndReset(accumulator)
// Clean extending fork data
fv.memoryDiff = nil
Expand Down Expand Up @@ -280,6 +299,7 @@ func (fv *ForkValidator) ClearWithUnwind(accumulator *shards.Accumulator, c shar
// validateAndStorePayload validate and store a payload fork chain if such chain results valid.
func (fv *ForkValidator) validateAndStorePayload(txc wrap.TxContainer, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody,
notifications *shards.Notifications) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) {
start := time.Now()
if err := fv.validatePayload(txc, header, body, unwindPoint, headersChain, bodiesChain, notifications); err != nil {
if errors.Is(err, consensus.ErrInvalidBlock) {
validationError = err
Expand All @@ -288,6 +308,7 @@ func (fv *ForkValidator) validateAndStorePayload(txc wrap.TxContainer, header *t
return
}
}
fv.timingsCache.Add(header.Hash(), []interface{}{"BlockValidation", time.Since(start)})

latestValidHash = header.Hash()
if validationError != nil {
Expand Down Expand Up @@ -319,3 +340,13 @@ func (fv *ForkValidator) validateAndStorePayload(txc wrap.TxContainer, header *t
status = engine_types.ValidStatus
return
}

// GetTimings returns the timings of the last block validation.
func (fv *ForkValidator) GetTimings(hash libcommon.Hash) []interface{} {
fv.lock.Lock()
defer fv.lock.Unlock()
if timings, ok := fv.timingsCache.Get(hash); ok {
return timings
}
return nil
}
4 changes: 3 additions & 1 deletion turbo/execution/eth1/ethereum_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"math/big"
"sync/atomic"

"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -63,6 +64,8 @@ type EthereumExecutionModule struct {
// consensus
engine consensus.Engine

doingPostForkchoice atomic.Bool

execution.UnimplementedExecutionServer
}

Expand Down Expand Up @@ -206,7 +209,6 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut

extendingHash := e.forkValidator.ExtendingForkHeadHash()
extendCanonical := extendingHash == libcommon.Hash{} && header.ParentHash == currentHeadHash

status, lvh, validationError, criticalError := e.forkValidator.ValidatePayload(tx, header, body.RawBody(), extendCanonical, e.logger)
if criticalError != nil {
return nil, criticalError
Expand Down
55 changes: 38 additions & 17 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/ledgerwatch/log/v3"
)

const startPruneFrom = 1024

type forkchoiceOutcome struct {
receipt *execution.ForkChoiceReceipt
err error
Expand Down Expand Up @@ -423,11 +425,12 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}

commitStart := time.Now()
if err := tx.Commit(); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
commitTime := time.Since(commitStart)

if e.hook != nil {
if err := e.db.View(ctx, func(tx kv.Tx) error {
Expand All @@ -441,30 +444,48 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
e.logger.Info("head updated", "number", *headNumber, "hash", headHash)
}

var commitStart time.Time
if err := e.db.Update(ctx, func(tx kv.RwTx) error {
var m runtime.MemStats
dbg.ReadMemStats(&m)
timings = append(timings, e.forkValidator.GetTimings(headHash)...)
timings = append(timings, "commit", commitTime, "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
e.logger.Info("Timings (slower than 50ms)", timings...)
}
if *headNumber >= startPruneFrom {
e.runPostForkchoiceInBackground(initialCycle)
}
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
Status: status,
ValidationError: validationError,
})
}

func (e *EthereumExecutionModule) runPostForkchoiceInBackground(initialCycle bool) {
if e.doingPostForkchoice.CompareAndSwap(false, true) {
return
}
go func() {
defer e.doingPostForkchoice.Store(false)
timings := []interface{}{}
// Wait for semaphore to be available
if e.semaphore.Acquire(e.bacgroundCtx, 1) != nil {
return
}
defer e.semaphore.Release(1)
if err := e.db.Update(e.bacgroundCtx, func(tx kv.RwTx) error {
if err := e.executionPipeline.RunPrune(e.db, tx, initialCycle); err != nil {
return err
}
if pruneTimings := e.executionPipeline.PrintTimings(); len(pruneTimings) > 0 {
timings = append(timings, pruneTimings...)
}
commitStart = time.Now()
return nil
}); err != nil {
err = fmt.Errorf("updateForkChoice: %w", err)
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
e.logger.Error("runPostForkchoiceInBackground", "error", err)
return
}
var m runtime.MemStats
dbg.ReadMemStats(&m)
timings = append(timings, "commit", time.Since(commitStart), "alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys))
e.logger.Info("Timings (slower than 50ms)", timings...)
}

sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(headHash),
Status: status,
ValidationError: validationError,
})
if len(timings) > 0 {
e.logger.Info("Timings: Post-Forkchoice (slower than 50ms)", timings...)
}
}()
}

0 comments on commit 89fc131

Please sign in to comment.