Skip to content

Commit

Permalink
include small dedup in block processor to handle blockByRoot blocks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec authored Apr 26, 2023
1 parent 55ae7d3 commit 1ccb36b
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import
../spec/[signatures, signatures_batch],
../sszdump

from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
from ../consensus_object_pools/consensus_manager import
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead,
Expand Down Expand Up @@ -47,6 +48,10 @@ const
## syncing the finalized part of the chain
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
## Number of slots from wall time that we start processing every payload
MAX_DEDUP_QUEUE_LEN = 16
## Number of blocks, with FIFO discipline, against which to check queued
## blocks before being processed to avoid spamming ELs. This should stay
## small enough that even O(n) algorithms are reasonable.

type
BlobSidecars* = seq[ref BlobSidecar]
Expand Down Expand Up @@ -103,6 +108,9 @@ type
## The slot at which we sent a payload to the execution client the last
## time

dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)]
# Small buffer to allow for filtering of duplicate blocks in block queue

NewPayloadStatus {.pure.} = enum
valid
notValid
Expand Down Expand Up @@ -140,7 +148,9 @@ proc new*(T: type BlockProcessor,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)](
initialSize = MAX_DEDUP_QUEUE_LEN)
)

# Sync callbacks
Expand Down Expand Up @@ -688,6 +698,19 @@ proc addBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"

# Dedup
# ------------------------------------------------------------------------------

func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool =
let key = (entry.blck.root, entry.blck.signature)
if self.dupBlckBuf.contains key:
return true
doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN
if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN:
self.dupBlckBuf.shrink(fromFirst = 1)
self.dupBlckBuf.addLast key
false

# Event Loop
# ------------------------------------------------------------------------------

Expand All @@ -704,6 +727,12 @@ proc processBlock(
error "Processing block before genesis, clock turned back?"
quit 1

if self.checkDuplicateBlocks(entry):
if entry.resfut != nil:
entry.resfut.complete(Result[void, VerifierError].err(
VerifierError.Duplicate))
return

let res = withBlck(entry.blck):
await self.storeBlock(
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
Expand Down

0 comments on commit 1ccb36b

Please sign in to comment.