Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include small dedup in block processor to handle blockByRoot blocks #4850

Merged
merged 1 commit into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import
../spec/[signatures, signatures_batch],
../sszdump

from std/deques import Deque, addLast, contains, initDeque, items, len, shrink
from ../consensus_object_pools/consensus_manager import
ConsensusManager, checkNextProposer, optimisticExecutionPayloadHash,
runProposalForkchoiceUpdated, shouldSyncOptimistically, updateHead,
Expand Down Expand Up @@ -46,6 +47,10 @@ const
## syncing the finalized part of the chain
PAYLOAD_PRE_WALL_SLOTS = SLOTS_PER_EPOCH * 2
## Number of slots from wall time that we start processing every payload
MAX_DEDUP_QUEUE_LEN = 16
## Number of blocks, with FIFO discipline, against which to check queued
## blocks before being processed to avoid spamming ELs. This should stay
## small enough that even O(n) algorithms are reasonable.

type
BlobSidecars* = seq[ref BlobSidecar]
Expand Down Expand Up @@ -102,6 +107,9 @@ type
## The slot at which we sent a payload to the execution client the last
## time

dupBlckBuf: Deque[(Eth2Digest, ValidatorSig)]
# Small buffer to allow for filtering of duplicate blocks in block queue

NewPayloadStatus {.pure.} = enum
valid
notValid
Expand Down Expand Up @@ -139,7 +147,9 @@ proc new*(T: type BlockProcessor,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
verifier: BatchVerifier(rng: rng, taskpool: taskpool),
dupBlckBuf: initDeque[(Eth2Digest, ValidatorSig)](
initialSize = MAX_DEDUP_QUEUE_LEN)
)

# Sync callbacks
Expand Down Expand Up @@ -685,6 +695,19 @@ proc addBlock*(
except AsyncQueueFullError:
raiseAssert "unbounded queue"

# Dedup
# ------------------------------------------------------------------------------

func checkDuplicateBlocks(self: ref BlockProcessor, entry: BlockEntry): bool =
let key = (entry.blck.root, entry.blck.signature)
if self.dupBlckBuf.contains key:
return true
doAssert self.dupBlckBuf.len <= MAX_DEDUP_QUEUE_LEN
if self.dupBlckBuf.len >= MAX_DEDUP_QUEUE_LEN:
self.dupBlckBuf.shrink(fromFirst = 1)
self.dupBlckBuf.addLast key
false

# Event Loop
# ------------------------------------------------------------------------------

Expand All @@ -701,6 +724,12 @@ proc processBlock(
error "Processing block before genesis, clock turned back?"
quit 1

if self.checkDuplicateBlocks(entry):
if entry.resfut != nil:
entry.resfut.complete(Result[void, VerifierError].err(
VerifierError.Duplicate))
return

let res = withBlck(entry.blck):
await self.storeBlock(
entry.src, wallTime, blck, entry.blobs, entry.maybeFinalized,
Expand Down