diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 7c937dbafd..147cb4449c 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -13,6 +13,7 @@ import ../spec/[signatures, signatures_batch], ../sszdump +from std/deques import Deque, addLast, contains, initDeque, items, len, shrink from ../consensus_object_pools/consensus_manager import ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash, runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead, @@ -47,6 +48,10 @@ const ## syncing the finalized part of the chain PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2 ## Number of slots from wall time that we start processing every payload + MAX_DEDUP_QUEUE_LEN = 16 + ## Number of blocks, with FIFO discipline, against which to check queued + ## blocks before being processed to avoid spamming ELs. This should stay + ## small enough that even O(n) algorithms are reasonable. type BlobSidecars* = seq[ref BlobSidecar] @@ -103,6 +108,9 @@ type ## The slot at which we sent a payload to the execution client the last ## time + dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)] + # Small buffer to allow for filtering of duplicate blocks in block queue + NewPayloadStatus {.pure.} = enum valid notValid @@ -140,7 +148,9 @@ proc new*(T: type BlockProcessor, validatorMonitor: validatorMonitor, blobQuarantine: blobQuarantine, getBeaconTime: getBeaconTime, - verifier: BatchVerifier(rng: rng, taskpool: taskpool) + verifier: BatchVerifier(rng: rng, taskpool: taskpool), + dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)]( + initialSize = MAX_DEDUP_QUEUE_LEN) ) # Sync callbacks @@ -688,6 +698,19 @@ proc addBlock*( except AsyncQueueFullError: raiseAssert "unbounded queue" +# Dedup +# ------------------------------------------------------------------------------ + +func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool = + let key = (entry.blck.root, entry.blck.signature) + if self.dupBlckBuf.contains key: + return true + doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN + if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN: + self.dupBlckBuf.shrink(fromFirst = 1) + self.dupBlckBuf.addLast key + false + # Event Loop # ------------------------------------------------------------------------------ @@ -704,6 +727,12 @@ proc processBlock( error "Processing block before genesis, clock turned back?" quit 1 + if self.checkDuplicateBlocks(entry): + if entry.resfut != nil: + entry.resfut.complete(Result[void, VerifierError].err( + VerifierError.Duplicate)) + return + let res = withBlck(entry.blck): await self.storeBlock( entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,