Skip to content

Commit

Permalink
Update stage_finish.go : notifications to rpc daemon (#2755)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreaLanfranchi authored Oct 4, 2021
1 parent ca3dda2 commit f70dd63
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
57 changes: 39 additions & 18 deletions eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package stagedsync

import (
"bytes"
"context"
"fmt"
"encoding/binary"
"reflect"

"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/rlp"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -124,29 +128,46 @@ func PruneFinish(u *PruneState, tx kv.RwTx, cfg FinishCfg, ctx context.Context)
return nil
}

func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error {
notifyTo, err := stages.GetStageProgress(tx, stages.Finish) // because later stages can be disabled
if err != nil {
return err
}
notifyFrom := finishStageBeforeSync
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo + 1
}
func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishStageAfterSync uint64, unwindTo *uint64, notifier ChainEventNotifier, tx kv.Tx) error {

if notifier == nil {
log.Warn("rpc notifier is not set, rpc daemon won't be updated about headers")
log.Trace("RPC Daemon notification channel not set. No headers notifications will be sent")
return nil
}

for i := notifyFrom; i <= notifyTo; i++ {
header := rawdb.ReadHeaderByNumber(tx, i)
if header == nil {
return fmt.Errorf("could not find canonical header for number: %d", i)
// Notify all headers we have (either canonical or not) in a maximum range span of 1024
var notifyFrom uint64
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo
} else {
heightSpan := finishStageAfterSync - finishStageBeforeSync
if heightSpan > 1024 {
heightSpan = 1024
}
notifier.OnNewHeader(header)
notifyFrom = finishStageAfterSync - heightSpan
}

log.Info("Updated current block for the RPC API", "from", notifyFrom, "to", notifyTo)
startKey := make([]byte, reflect.TypeOf(notifyFrom).Size()+32)
var notifyTo uint64
binary.BigEndian.PutUint64(startKey, notifyFrom)
if err := tx.ForEach(kv.Headers, startKey, func(k, headerRLP []byte) error {
if len(headerRLP) == 0 {
return nil
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(headerRLP), header); err != nil {
log.Error("Invalid block header RLP", "err", err)
return err
}
notifyTo = header.Number.Uint64()
notifier.OnNewHeader(header)
return libcommon.Stopped(ctx.Done())
}); err != nil {
log.Error("RPC Daemon notification failed", "error", err)
return err
}

log.Info("RPC Daemon notified of new headers", "from", notifyFrom, "to", notifyTo)
return nil

}
10 changes: 4 additions & 6 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,18 @@ func StageLoopStep(
}
updateHead(ctx, head, headHash, headTd256)

if notifications.Accumulator != nil {
if notifications != nil && notifications.Accumulator != nil {
if err := db.View(ctx, func(tx kv.Tx) error {
header := rawdb.ReadCurrentHeader(tx)
if header == nil {
return nil
}

pendingBaseFee := misc.CalcBaseFee(notifications.Accumulator.ChainConfig(), header)
notifications.Accumulator.SendAndReset(ctx, notifications.StateChangesConsumer, pendingBaseFee.Uint64())

err = stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, sync.PrevUnwindPoint(), notifications.Events, tx)
if err != nil {
return err
}
return nil
return stagedsync.NotifyNewHeaders(ctx, finishProgressBefore, head, sync.PrevUnwindPoint(), notifications.Events, tx)

}); err != nil {
return err
}
Expand Down

0 comments on commit f70dd63

Please sign in to comment.