Skip to content

Commit

Permalink
Beacon sync block import via forked chain (#2747)
Browse files Browse the repository at this point in the history
* Accept finalised hash from RPC with the canon header as well

* Reorg internal sync descriptor(s)

details:
  Update target from RPC to provide the `consensus header` as well as
  the `finalised` block number

why:
  Prepare for using `importBlock()` instead of `persistBlocks()`

* Cosmetic updates

details:
+ Collect all pretty printers in `helpers.nim`
+ Remove unused return codes from function prototype

* Use `importBlock()` + `forkChoice()` rather than `persistBlocks()`

* Update logging and metrics

* Update docu
  • Loading branch information
mjfh authored Oct 17, 2024
1 parent 7d41a99 commit 0b93236
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 269 deletions.
2 changes: 1 addition & 1 deletion nimbus/beacon/api_handler/api_forkchoice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,

# Update sync header (if any)
com.syncReqNewHead(header)
com.reqBeaconSyncTargetCB(header)
com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash)

return simpleFCU(PayloadExecutionStatus.syncing)

Expand Down
6 changes: 3 additions & 3 deletions nimbus/common/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type
SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].}
## Update head for syncing

ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].}
ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].}
## Ditto (for beacon sync)

NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].}
Expand Down Expand Up @@ -337,10 +337,10 @@ proc syncReqNewHead*(com: CommonRef; header: Header)
if not com.syncReqNewHead.isNil:
com.syncReqNewHead(header)

proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) =
proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) =
## Used by RPC updater
if not com.reqBeaconSyncTargetCB.isNil:
com.reqBeaconSyncTargetCB(header)
com.reqBeaconSyncTargetCB(header, finHash)

proc notifyBadBlock*(com: CommonRef; invalid, origin: Header)
{.gcsafe, raises: [].} =
Expand Down
2 changes: 1 addition & 1 deletion nimbus/sync/beacon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ proc init*(
desc.initSync(ethNode, maxPeers)
desc.ctx.pool.nBodiesBatch = chunkSize
# Initalise for `persistBlocks()`
desc.ctx.pool.chain = chain.com.newChain()
desc.ctx.pool.chain = chain
desc

proc start*(ctx: BeaconSyncRef) =
Expand Down
58 changes: 31 additions & 27 deletions nimbus/sync/beacon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,26 @@ A sequence *@[h(1),h(2),..]* of block headers is called a *linked chain* if

General header linked chains layout diagram

0 C D E (1)
0 C D H (1)
o----------------o---------------------o----------------o--->
| <-- linked --> | <-- unprocessed --> | <-- linked --> |

Here, the single upper letter symbols *0*, *C*, *D*, *E* denote block numbers.
Here, the single upper letter symbols *0*, *C*, *D*, *H* denote block numbers.
For convenience, these letters are also identified with its associated block
header or the full blocks. Saying *"the header 0"* is short for *"the header
with block number 0"*.

Meaning of *0*, *C*, *D*, *E*:
Meaning of *0*, *C*, *D*, *H*:

* *0* -- Genesis, block number number *0*
* *C* -- coupler, maximal block number of linked chain starting at *0*
* *D* -- dangling, minimal block number of linked chain ending at *E*
* *D* -- dangling, minimal block number of linked chain ending at *H*
with *C <= D*
* *E* -- end, block number of some finalised block (not necessarily the latest
one)
* *H* -- head, end block number of **consensus head** (not necessarily the
latest one as this is moving while processing)

This definition implies *0 <= C <= D <= E* and the state of the header linked
chains can uniquely be described by the triple of block numbers *(C,D,E)*.
This definition implies *0 <= C <= D <= H* and the state of the header linked
chains can uniquely be described by the triple of block numbers *(C,D,H)*.


### Storage of header chains:
Expand All @@ -78,7 +78,7 @@ correspond to finalised blocks, e.g. the sub-interval *[0,**base**]* where
half open interval *(**base**,C]* are always stored on the *beaconHeader*
column of the *KVT* database.

The block numbers from the interval *[D,E]* also reside on the *beaconHeader*
The block numbers from the interval *[D,H]* also reside on the *beaconHeader*
column of the *KVT* database table.


Expand All @@ -89,7 +89,7 @@ Minimal layout on a pristine system
0 (2)
C
D
E
H
o--->

When first initialised, the header linked chains are set to *(0,0,0)*.
Expand All @@ -101,40 +101,40 @@ A header chain with an non empty open interval *(C,D)* can be updated only by
increasing *C* or decreasing *D* by adding/prepending headers so that the
linked chain condition is not violated.

Only when the gap open interval *(C,D)* vanishes, the right end *E* can be
Only when the gap open interval *(C,D)* vanishes, the right end *H* can be
increased to a larger target block number *T*, say. This block number will
typically be the **consensus head**. Then

* *C==D* beacuse the open interval *(C,D)* is empty
* *C==E* because *C* is maximal (see definition of `C` above)
* *C==H* because *C* is maximal (see definition of `C` above)

and the header chains *(E,E,E)* (depicted in *(3)* below) can be set to
and the header chains *(H,H,H)* (depicted in *(3)* below) can be set to
*(C,T,T)* as depicted in *(4)* below.

Layout before updating of *E*
Layout before updating of *H*

C (3)
D
0 E T
0 H T
o----------------o---------------------o---->
| <-- linked --> |

New layout with moving *D* and *E* to *T*
New layout with moving *D* and *H* to *T*

D' (4)
0 C E'
0 C H'
o----------------o---------------------o---->
| <-- linked --> | <-- unprocessed --> |

with *D'=T* and *E'=T*.
with *D'=T* and *H'=T*.

Note that diagram *(3)* is a generalisation of *(2)*.


### Complete a header linked chain:

The header chain is *relatively complete* if it satisfies clause *(3)* above
for *0 < C*. It is *fully complete* if *E==T*. It should be obvious that the
for *0 < C*. It is *fully complete* if *H==T*. It should be obvious that the
latter condition is temporary only on a live system (as *T* is contiuously
updated.)

Expand All @@ -146,20 +146,22 @@ database state will be updated incrementally.
Block chain import/execution
-----------------------------

The following diagram with a parially imported/executed block chain amends the
The following diagram with a partially imported/executed block chain amends the
layout *(1)*:

0 B C D E (5)
o------------------o-------o---------------------o----------------o-->
0 B L C D H (5)
o------------o-----o-------o---------------------o----------------o-->
| <-- imported --> | | | |
| <------- linked ------> | <-- unprocessed --> | <-- linked --> |

where

where *B* is the **base**, i.e. the **base state** block number of the last
imported/executed block. It also refers to the global state block number of
the ledger database.
* *B* is the **base state** stored on the persistent state database. *B* is
not addressed directly except upon start up or resuming sync when *B == L*.
* *L* is the last imported/executed block, typically up to the **canonical
consensus head**.

The headers corresponding to the half open interval `(B,C]` will be completed
The headers corresponding to the half open interval `(L,C]` will be completed
by fetching block bodies and then import/execute them together with the already
cached headers.

Expand Down Expand Up @@ -260,9 +262,11 @@ be available if *nimbus* is compiled with the additional make flags
|:-------------------|:------------:|:--------------------|
| | | |
| beacon_base | block height | **B**, *increasing* |
| beacon_latest | block height | **L**, *increasing* |
| beacon_coupler | block height | **C**, *increasing* |
| beacon_dangling | block height | **D** |
| beacon_end | block height | **E**, *increasing* |
| beacon_final | block height | **F**, *increasing* |
| beacon_head | block height | **H**, *increasing* |
| beacon_target | block height | **T**, *increasing* |
| | | |
| beacon_header_lists_staged | size | # of staged header list records |
Expand Down
24 changes: 12 additions & 12 deletions nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.


{.push raises:[].}

import
Expand Down Expand Up @@ -112,8 +111,8 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
debug info

# Check for a possible header layout and body request changes
discard ctx.updateLinkedHChainsLayout info
discard ctx.updateBlockRequests info
ctx.updateSyncStateLayout info
ctx.updateBlockRequests info

# Execute staged block records.
if ctx.blocksStagedCanImportOk():
Expand All @@ -127,13 +126,10 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
defer: ctx.pool.importRunningOk = false

# Import from staged queue.
while ctx.blocksStagedImport info:
while await ctx.blocksStagedImport(info):
ctx.updateMetrics()

# Allow pseudo/async thread switch
await sleepAsync asyncThreadSwitchTimeSlot

# At the end of the cycle, leave time to refill headers/blocks
# At the end of the cycle, leave time to trigger refill headers/blocks
await sleepAsync daemonWaitInterval

ctx.updateMetrics()
Expand Down Expand Up @@ -174,12 +170,16 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
trace info, peer, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr

# Update consensus header target when needed. It comes with a finalised
# header hash where we need to complete the block number.
await buddy.headerStagedUpdateTarget info

if not await buddy.napUnlessSomethingToFetch info:
#
# Layout of a triple of linked header chains (see `README.md`)
# ::
# 0 C D E
# | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,E] ---> |
# 0 C D H
# | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,H] ---> |
# o----------------o---------------------o----------------o--->
# | <-- linked --> | <-- unprocessed --> | <-- linked --> |
#
Expand All @@ -194,7 +194,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
#
# The block numbers range concurrently taken from `(C,D)` are chosen
# from the upper range. So exactly one of the actors has a range
# `[whatever,D-1]` adjacent to `[D,E]`. Call this actor the lead actor.
# `[whatever,D-1]` adjacent to `[D,H]`. Call this actor the lead actor.
#
# For the lead actor, headers can be downloaded all by the hashes as
# the parent hash for the header with block number `D` is known. All
Expand All @@ -217,7 +217,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} =
if await buddy.headersStagedCollect info:

# * Save updated state and headers
# * Decrease the dangling left boundary `D` of the trusted range `[D,E]`
# * Decrease the dangling left boundary `D` of the trusted range `[D,H]`
discard buddy.ctx.headersStagedProcess info

# Fetch bodies and combine them with headers to blocks to be staged. These
Expand Down
80 changes: 61 additions & 19 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ logScope:
# Private functions
# ------------------------------------------------------------------------------

func getNthHash(blk: BlocksForImport; n: int): Hash32 =
if n + 1 < blk.blocks.len:
blk.blocks[n + 1].header.parentHash
else:
rlp.encode(blk.blocks[n].header).keccak256


proc fetchAndCheck(
buddy: BeaconBuddyRef;
ivReq: BnRange;
Expand Down Expand Up @@ -218,38 +225,73 @@ proc blocksStagedCollect*(
return true


proc blocksStagedImport*(ctx: BeaconCtxRef; info: static[string]): bool =
proc blocksStagedImport*(
ctx: BeaconCtxRef;
info: static[string];
): Future[bool]
{.async.} =
## Import/execute blocks record from staged queue
##
let qItem = ctx.blk.staged.ge(0).valueOr:
return false

# Fetch least record, accept only if it matches the global ledger state
let base = ctx.dbStateBlockNumber()
if qItem.key != base + 1:
trace info & ": there is a gap", B=base.bnStr, stagedBottom=qItem.key.bnStr
return false
block:
let imported = ctx.chain.latestNumber()
if qItem.key != imported + 1:
trace info & ": there is a gap L vs. staged",
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr
return false

# Remove from queue
discard ctx.blk.staged.delete qItem.key

# Execute blocks
let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr:
# FIXME: should that be rather an `raiseAssert` here?
warn info & ": block exec error", B=base.bnStr,
iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1),
error=error
doAssert base == ctx.dbStateBlockNumber()
return false

trace info & ": imported staged blocks", B=ctx.dbStateBlockNumber.bnStr,
first=qItem.key.bnStr, stats
let
nBlocks = qItem.data.blocks.len
iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1)

var maxImport = iv.maxPt
for n in 0 ..< nBlocks:
let nBn = qItem.data.blocks[n].header.number
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
warn info & ": import block error", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nBn=nBn.bnStr, txLevel=ctx.chain.db.level, `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break

# Remove stashed headers
for bn in qItem.key ..< qItem.key + qItem.data.blocks.len.uint64:
# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nHash else: ctx.layout.finalHash

doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(headHash=nHash, finalizedHash=finHash).isOkOr:
warn info & ": fork choice error", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level, nHash,
finHash=(if finHash == nHash: "nHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break

# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot

# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:
ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number)

# Remove stashed headers for imported blocks
for bn in iv.minPt .. maxImport:
ctx.dbUnstashHeader bn

true
trace info & ": import done", iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level
return true


func blocksStagedBottomKey*(ctx: BeaconCtxRef): BlockNumber =
Expand Down
12 changes: 12 additions & 0 deletions nimbus/sync/beacon/worker/blocks_unproc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ proc blocksUnprocInit*(ctx: BeaconCtxRef) =
## Constructor
ctx.blk.unprocessed = BnRangeSet.init()

proc blocksUnprocSet*(ctx: BeaconCtxRef) =
## Clear
ctx.blk.unprocessed.clear()
ctx.blk.borrowed = 0u

proc blocksUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) =
## Set up new unprocessed range
ctx.blocksUnprocSet()
# Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)`
if minPt <= maxPt:
discard ctx.blk.unprocessed.merge(minPt, maxPt)

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
Loading

0 comments on commit 0b93236

Please sign in to comment.