diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index 3cda2c1a9a6..48bf0f2feb9 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -148,6 +148,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; /// be stored before we start dropping them. const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; +/// The maximum number of queued `Vec<[`SignedBeaconBlockAndBlobsSidecar`]>` objects received during syncing that will +/// be stored before we start dropping them. +const MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN: usize = 64; + /// The maximum number of queued `StatusMessage` objects received from the network RPC that will be /// stored before we start dropping them. const MAX_STATUS_QUEUE_LEN: usize = 1_024; @@ -206,6 +210,7 @@ pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; +pub const BLOB_CHAIN_SEGMENT: &str = "blob_chain_segment"; /// A simple first-in-first-out queue with a maximum length. struct FifoQueue { @@ -546,6 +551,19 @@ impl WorkEvent { } } + pub fn blob_chain_segment( + process_id: ChainSegmentProcessId, + blocks_and_blobs: Vec>, + ) -> Self { + Self { + drop_during_sync: false, + work: Work::BlobChainSegment { + process_id, + blocks_and_blobs, + }, + } + } + /// Create a new work event to process `StatusMessage`s from the RPC network. pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self { Self { @@ -809,6 +827,10 @@ pub enum Work { request_id: PeerRequestId, request: BlobsByRootRequest, }, + BlobChainSegment { + process_id: ChainSegmentProcessId, + blocks_and_blobs: Vec>, + }, } impl Work { @@ -836,6 +858,7 @@ impl Work { Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, + Work::BlobChainSegment { .. } => BLOB_CHAIN_SEGMENT, } } } @@ -971,6 +994,7 @@ impl BeaconProcessor { let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); + let mut blob_chain_segment_queue = FifoQueue::new(MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut gossip_block_and_blobs_sidecar_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN); @@ -1072,6 +1096,11 @@ impl BeaconProcessor { self.spawn_worker(item, toolbox); // Check sync blocks before gossip blocks, since we've already explicitly // requested these blocks. + } else if let Some(item) = blob_chain_segment_queue.pop() { + self.spawn_worker(item, toolbox); + // Sync block and blob segments have the same priority as normal chain + // segments. This here might change depending on how batch processing + // evolves. } else if let Some(item) = rpc_block_queue.pop() { self.spawn_worker(item, toolbox); // Check delayed blocks before gossip blocks, the gossip blocks might rely @@ -1339,6 +1368,9 @@ impl BeaconProcessor { request_id, request, } => todo!(), + Work::BlobChainSegment { .. } => { + blob_chain_segment_queue.push(work, work_id, &self.log) + } } } } @@ -1775,6 +1807,14 @@ impl BeaconProcessor { seen_timestamp, ) }), + Work::BlobChainSegment { + process_id, + blocks_and_blobs, + } => task_spawner.spawn_async(async move { + worker + .process_blob_chain_segment(process_id, blocks_and_blobs) + .await + }), }; } } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 5d97894fe40..7e22d4d8f45 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -15,7 +15,7 @@ use lighthouse_network::PeerAction; use slog::{debug, error, info, warn}; use std::sync::Arc; use tokio::sync::mpsc; -use types::{Epoch, Hash256, SignedBeaconBlock}; +use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -241,6 +241,17 @@ impl Worker { self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); } + pub async fn process_blob_chain_segment( + &self, + sync_type: ChainSegmentProcessId, + downloaded_blocks: Vec>, + ) { + warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!"); + let result = BatchProcessResult::Success { + was_non_empty: !downloaded_blocks.is_empty(), + }; + self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result }); + } /// Helper function to process blocks batches which only consumes the chain and blocks to process. async fn process_blocks<'a>( &self, diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 176fbde96ca..daf7e72ef8f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -209,8 +209,10 @@ impl Processor { SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => { unreachable!("Block lookups do not request BBRange requests") } - id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id, - SyncId::RangeBlockBlob { id } => unimplemented!("do it"), + id @ (SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::BackFillSidecarPair { .. } + | SyncId::RangeSidecarPair { .. }) => id, }, RequestId::Router => unreachable!("All BBRange requests belong to sync"), }; @@ -266,11 +268,12 @@ impl Processor { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { + SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::RangeSidecarPair { .. } + | SyncId::BackFillSidecarPair { .. } => { unreachable!("Batch syncing do not request BBRoot requests") } - - SyncId::RangeBlockBlob { id } => unimplemented!("do it"), }, RequestId::Router => unreachable!("All BBRoot requests belong to sync"), }; @@ -298,11 +301,12 @@ impl Processor { let request_id = match request_id { RequestId::Sync(sync_id) => match sync_id { id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id, - SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => { - unreachable!("Batch syncing do not request BBRoot requests") + SyncId::BackFillSync { .. } + | SyncId::RangeSync { .. } + | SyncId::RangeSidecarPair { .. } + | SyncId::BackFillSidecarPair { .. } => { + unreachable!("Batch syncing does not request BBRoot requests") } - - SyncId::RangeBlockBlob { id } => unimplemented!("do it"), }, RequestId::Router => unreachable!("All BBRoot requests belong to sync"), }; diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index d36bbbc79b1..e495daf3c5b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -24,7 +24,10 @@ use std::collections::{ HashMap, HashSet, }; use std::sync::Arc; -use types::{Epoch, EthSpec, SignedBeaconBlock}; +use types::{Epoch, EthSpec}; + +use super::manager::BlockTy; +use super::range_sync::BatchTy; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -54,7 +57,7 @@ impl BatchConfig for BackFillBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[Arc>]) -> u64 { + fn batch_attempt_hash(blocks: &[BlockTy]) -> u64 { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; let mut hasher = DefaultHasher::new(); @@ -390,7 +393,7 @@ impl BackFillSync { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>>, + beacon_block: Option>, ) -> Result { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -535,10 +538,13 @@ impl BackFillSync { let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id); self.current_processing_batch = Some(batch_id); - if let Err(e) = network - .processor_channel() - .try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) - { + let work_event = match blocks { + BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks), + BatchTy::BlocksAndBlobs(blocks_and_blobs) => { + BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs) + } + }; + if let Err(e) = network.processor_channel().try_send(work_event) { crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); // This is unlikely to happen but it would stall syncing since the batch now has no @@ -953,8 +959,8 @@ impl BackFillSync { peer: PeerId, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { - let request = batch.to_blocks_by_range_request(); - match network.backfill_blocks_by_range_request(peer, request, batch_id) { + let (request, is_blob_batch) = batch.to_blocks_by_range_request(); + match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) { Ok(request_id) => { // inform the batch about the new request if let Err(e) = batch.start_downloading_from_peer(peer, request_id) { @@ -1054,7 +1060,7 @@ impl BackFillSync { idle_peers.shuffle(&mut rng); while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch() { + if let Some(batch_id) = self.include_next_batch(network) { // send the batch self.send_batch(network, batch_id, peer)?; } else { @@ -1067,7 +1073,7 @@ impl BackFillSync { /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. - fn include_next_batch(&mut self) -> Option { + fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond genesis; if self.last_batch_downloaded { return None; @@ -1104,10 +1110,15 @@ impl BackFillSync { self.to_be_downloaded = self .to_be_downloaded .saturating_sub(BACKFILL_EPOCHS_PER_BATCH); - self.include_next_batch() + self.include_next_batch(network) } Entry::Vacant(entry) => { - entry.insert(BatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH)); + let batch_type = network.batch_type(batch_id); + entry.insert(BatchInfo::new( + &batch_id, + BACKFILL_EPOCHS_PER_BATCH, + batch_type, + )); if batch_id == 0 { self.last_batch_downloaded = true; } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 5f03d54ab5d..e5b5e72f5be 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -41,6 +41,7 @@ use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; +use crate::sync::range_sync::ExpectedBatchTy; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError, EngineState}; use futures::StreamExt; use lighthouse_network::rpc::methods::MAX_REQUEST_BLOCKS; @@ -69,15 +70,35 @@ pub const SLOT_IMPORT_TOLERANCE: usize = 32; pub type Id = u32; #[derive(Debug)] -pub struct SeansBlob {} +pub enum BlockTy { + Block { + block: Arc>, + }, + BlockAndBlob { + block_sidecar_pair: SignedBeaconBlockAndBlobsSidecar, + }, +} -#[derive(Debug)] -pub struct SeansBlock {} +// TODO: probably needes to be changed. This is needed because SignedBeaconBlockAndBlobsSidecar +// does not implement Hash +impl std::hash::Hash for BlockTy { + fn hash(&self, state: &mut H) { + match self { + BlockTy::Block { block } => block.hash(state), + BlockTy::BlockAndBlob { + block_sidecar_pair: block_and_blob, + } => block_and_blob.beacon_block.hash(state), + } + } +} -#[derive(Debug)] -pub struct SeansBlockBlob { - blob: SeansBlob, - block: SeansBlock, +impl BlockTy { + pub fn slot(&self) -> Slot { + match self { + BlockTy::Block { block } => block.slot(), + BlockTy::BlockAndBlob { block_sidecar_pair } => block_sidecar_pair.beacon_block.slot(), + } + } } /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -88,10 +109,12 @@ pub enum RequestId { ParentLookup { id: Id }, /// Request was from the backfill sync algorithm. BackFillSync { id: Id }, + /// Backfill request for blocks and sidecars. + BackFillSidecarPair { id: Id }, /// The request was from a chain in the range sync algorithm. RangeSync { id: Id }, - /// The request was from a chain in range, asking for ranges of blocks and blobs. - RangeBlockBlob { id: Id }, + /// The request was from a chain in range, asking for ranges of blocks and sidecars. + RangeSidecarPair { id: Id }, } #[derive(Debug)] @@ -300,7 +323,25 @@ impl SyncManager { .parent_lookup_failed(id, peer_id, &mut self.network); } RequestId::BackFillSync { id } => { - if let Some(batch_id) = self.network.backfill_sync_response(id, true) { + if let Some(batch_id) = self + .network + .backfill_request_failed(id, ExpectedBatchTy::OnlyBlock) + { + match self + .backfill_sync + .inject_error(&mut self.network, batch_id, &peer_id, id) + { + Ok(_) => {} + Err(_) => self.update_sync_state(), + } + } + } + + RequestId::BackFillSidecarPair { id } => { + if let Some(batch_id) = self + .network + .backfill_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) + { match self .backfill_sync .inject_error(&mut self.network, batch_id, &peer_id, id) @@ -311,7 +352,10 @@ impl SyncManager { } } RequestId::RangeSync { id } => { - if let Some((chain_id, batch_id)) = self.network.range_sync_response(id, true) { + if let Some((chain_id, batch_id)) = self + .network + .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlock) + { self.range_sync.inject_error( &mut self.network, peer_id, @@ -322,8 +366,11 @@ impl SyncManager { self.update_sync_state() } } - RequestId::RangeBlockBlob { id } => { - if let Some((chain_id, batch_id)) = self.network.fail_block_bob_request(id) { + RequestId::RangeSidecarPair { id } => { + if let Some((chain_id, batch_id)) = self + .network + .range_sync_request_failed(id, ExpectedBatchTy::OnlyBlockBlobs) + { self.range_sync.inject_error( &mut self.network, peer_id, @@ -632,7 +679,7 @@ impl SyncManager { peer_id, blob_sidecar, seen_timestamp, - } => todo!(), + } => self.rpc_sidecar_received(request_id, peer_id, blob_sidecar, seen_timestamp), SyncMessage::RpcBlockAndBlob { request_id, peer_id, @@ -720,16 +767,17 @@ impl SyncManager { &mut self.network, ), RequestId::BackFillSync { id } => { - if let Some(batch_id) = self - .network - .backfill_sync_response(id, beacon_block.is_none()) - { + if let Some((batch_id, block)) = self.network.backfill_sync_block_response( + id, + beacon_block, + ExpectedBatchTy::OnlyBlock, + ) { match self.backfill_sync.on_block_response( &mut self.network, batch_id, &peer_id, id, - beacon_block, + block, ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), Ok(ProcessResult::Successful) => {} @@ -742,38 +790,120 @@ impl SyncManager { } } RequestId::RangeSync { id } => { - if let Some((chain_id, batch_id)) = - self.network.range_sync_response(id, beacon_block.is_none()) - { + if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response( + id, + beacon_block, + ExpectedBatchTy::OnlyBlock, + ) { self.range_sync.blocks_by_range_response( &mut self.network, peer_id, chain_id, batch_id, id, - beacon_block, + block, ); self.update_sync_state(); } } - RequestId::RangeBlockBlob { id } => { - // do stuff - // self.network.block_blob_block_response(id, block); + + RequestId::BackFillSidecarPair { id } => { + if let Some((batch_id, block)) = self.network.backfill_sync_block_response( + id, + beacon_block, + ExpectedBatchTy::OnlyBlockBlobs, + ) { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + block, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } + RequestId::RangeSidecarPair { id } => { + if let Some((chain_id, batch_id, block)) = self.network.range_sync_block_response( + id, + beacon_block, + ExpectedBatchTy::OnlyBlockBlobs, + ) { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + block, + ); + self.update_sync_state(); + } } } } - fn rpc_blob_received( + fn rpc_sidecar_received( &mut self, request_id: RequestId, peer_id: PeerId, - beacon_block: Option>>, + maybe_sidecar: Option::EthSpec>>>, seen_timestamp: Duration, ) { - let RequestId::RangeBlockBlob { id } = request_id else { - panic!("Wrong things going on "); - }; - // get the paired block blob from the network context and send it to range + match request_id { + RequestId::SingleBlock { id } => todo!("do we request individual sidecars?"), + RequestId::ParentLookup { id } => todo!(), + RequestId::BackFillSync { .. } => { + unreachable!("An only blocks request does not receive sidecars") + } + RequestId::BackFillSidecarPair { id } => { + if let Some((batch_id, block)) = self + .network + .backfill_sync_sidecar_response(id, maybe_sidecar) + { + match self.backfill_sync.on_block_response( + &mut self.network, + batch_id, + &peer_id, + id, + block, + ) { + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Ok(ProcessResult::Successful) => {} + Err(_error) => { + // The backfill sync has failed, errors are reported + // within. + self.update_sync_state(); + } + } + } + } + RequestId::RangeSync { .. } => { + unreachable!("And only blocks range request does not receive sidecars") + } + RequestId::RangeSidecarPair { id } => { + if let Some((chain_id, batch_id, block)) = + self.network.range_sync_sidecar_response(id, maybe_sidecar) + { + self.range_sync.blocks_by_range_response( + &mut self.network, + peer_id, + chain_id, + batch_id, + id, + block, + ); + self.update_sync_state(); + } + } + } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 15003caa1a4..9595403e733 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,8 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. -use super::manager::{Id, RequestId as SyncRequestId, SeansBlob, SeansBlock, SeansBlockBlob}; -use super::range_sync::{BatchId, ChainId}; +use super::manager::{BlockTy, Id, RequestId as SyncRequestId}; +use super::range_sync::{BatchId, ChainId, ExpectedBatchTy}; use crate::beacon_processor::WorkEvent; use crate::service::{NetworkMessage, RequestId}; use crate::status::ToStatusMessage; @@ -12,22 +12,54 @@ use lighthouse_network::rpc::methods::BlobsByRangeRequest; use lighthouse_network::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason}; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource, Request}; use slog::{debug, trace, warn}; +use std::collections::hash_map::Entry; use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::mpsc; +use types::{BlobsSidecar, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar}; #[derive(Debug, Default)] -struct BlockBlobRequestInfo { - /// Blocks we have received awaiting for their corresponding blob - accumulated_blocks: VecDeque, - /// Blobs we have received awaiting for their corresponding block - accumulated_blobs: VecDeque, +struct BlockBlobRequestInfo { + /// Blocks we have received awaiting for their corresponding sidecar. + accumulated_blocks: VecDeque>>, + /// Sidecars we have received awaiting for their corresponding block. + accumulated_sidecars: VecDeque>>, /// Whether the individual RPC request for blocks is finished or not. - // Not sure if this is needed is_blocks_rpc_finished: bool, - /// Whether the individual RPC request for blobs is finished or not - // Not sure if this is needed - is_blobs_rpc_finished: bool, + /// Whether the individual RPC request for sidecars is finished or not. + is_sidecar_rpc_finished: bool, +} + +impl BlockBlobRequestInfo { + pub fn add_block_response(&mut self, maybe_block: Option>>) { + match maybe_block { + Some(block) => self.accumulated_blocks.push_back(block), + None => self.is_blocks_rpc_finished = true, + } + } + + pub fn add_sidecar_response(&mut self, maybe_sidecar: Option>>) { + match maybe_sidecar { + Some(sidecar) => self.accumulated_sidecars.push_back(sidecar), + None => self.is_sidecar_rpc_finished = true, + } + } + + pub fn pop_response(&mut self) -> Option> { + if !self.accumulated_blocks.is_empty() && !self.accumulated_blocks.is_empty() { + let beacon_block = self.accumulated_blocks.pop_front().expect("non empty"); + let blobs_sidecar = self.accumulated_sidecars.pop_front().expect("non empty"); + return Some(SignedBeaconBlockAndBlobsSidecar { + beacon_block, + blobs_sidecar, + }); + } + None + } + + pub fn is_finished(&self) -> bool { + self.is_blocks_rpc_finished && self.is_sidecar_rpc_finished + } } /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. @@ -47,7 +79,12 @@ pub struct SyncNetworkContext { /// BlocksByRange requests made by backfill syncing. backfill_requests: FnvHashMap, - block_blob_requests: FnvHashMap, + /// BlocksByRange requests paired with BlobsByRange requests made by the range. + range_sidecar_pair_requests: + FnvHashMap)>, + + /// BlocksByRange requests paired with BlobsByRange requests made by the backfill sync. + backfill_sidecar_pair_requests: FnvHashMap)>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -67,15 +104,16 @@ impl SyncNetworkContext { beacon_processor_send: mpsc::Sender>, log: slog::Logger, ) -> Self { - Self { + SyncNetworkContext { network_send, - execution_engine_state: EngineState::Online, // always assume `Online` at the start network_globals, request_id: 1, - range_requests: FnvHashMap::default(), - backfill_requests: FnvHashMap::default(), + range_requests: Default::default(), + backfill_requests: Default::default(), + range_sidecar_pair_requests: Default::default(), + backfill_sidecar_pair_requests: Default::default(), + execution_engine_state: EngineState::Online, // always assume `Online` at the start beacon_processor_send, - block_blob_requests: Default::default(), log, } } @@ -122,190 +160,295 @@ impl SyncNetworkContext { pub fn blocks_by_range_request( &mut self, peer_id: PeerId, + batch_type: ExpectedBatchTy, request: BlocksByRangeRequest, chain_id: ChainId, batch_id: BatchId, ) -> Result { - trace!( - self.log, - "Sending BlocksByRange Request"; - "method" => "BlocksByRange", - "count" => request.count, - "peer" => %peer_id, - ); - let request = Request::BlocksByRange(request); - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request, - request_id, - })?; - self.range_requests.insert(id, (chain_id, batch_id)); - Ok(id) - } - - /// A blocks-blob by range request for the range sync algorithm. - pub fn blocks_blobs_by_range_request( - &mut self, - peer_id: PeerId, - request: BlocksByRangeRequest, // for now this is enough to get both requests. - chain_id: ChainId, - batch_id: BatchId, - ) -> Result { - debug!( - self.log, - "Sending BlockBlock by range request"; - "method" => "BlocksByRangeAndBlobsOrSomething", - "count" => request.count, - "peer" => %peer_id, - ); - - // create the shared request id. This is fine since the rpc handles substream ids. - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::RangeBlockBlob { id }); - - // Create the blob request based on the blob request. - let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { - start_slot: request.start_slot, - count: request.count, - }); - let blocks_request = Request::BlocksByRange(request); - - // Send both requests. Make sure both can be sent. - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blocks_request, - request_id, - }) - .and_then(|_| { - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: blobs_request, - request_id, - }) - })?; - let block_blob_info = BlockBlobRequestInfo::default(); - self.block_blob_requests - .insert(id, (chain_id, batch_id, block_blob_info)); - Ok(id) + match batch_type { + ExpectedBatchTy::OnlyBlock => { + trace!( + self.log, + "Sending BlocksByRange Request"; + "method" => "BlocksByRange", + "count" => request.count, + "peer" => %peer_id, + ); + let request = Request::BlocksByRange(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::RangeSync { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + self.range_requests.insert(id, (chain_id, batch_id)); + Ok(id) + } + ExpectedBatchTy::OnlyBlockBlobs => { + debug!( + self.log, + "Sending BlockBlock by range request"; + "method" => "Mixed by range request", + "count" => request.count, + "peer" => %peer_id, + ); + + // create the shared request id. This is fine since the rpc handles substream ids. + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id }); + + // Create the blob request based on the blob request. + let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { + start_slot: request.start_slot, + count: request.count, + }); + let blocks_request = Request::BlocksByRange(request); + + // Send both requests. Make sure both can be sent. + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blocks_request, + request_id, + })?; + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blobs_request, + request_id, + })?; + let block_blob_info = BlockBlobRequestInfo::default(); + self.range_sidecar_pair_requests + .insert(id, (chain_id, batch_id, block_blob_info)); + Ok(id) + } + } } /// A blocks by range request sent by the backfill sync algorithm pub fn backfill_blocks_by_range_request( &mut self, peer_id: PeerId, + batch_type: ExpectedBatchTy, request: BlocksByRangeRequest, batch_id: BatchId, ) -> Result { - trace!( - self.log, - "Sending backfill BlocksByRange Request"; - "method" => "BlocksByRange", - "count" => request.count, - "peer" => %peer_id, - ); - let request = Request::BlocksByRange(request); - let id = self.next_id(); - let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request, - request_id, - })?; - self.backfill_requests.insert(id, batch_id); - Ok(id) + match batch_type { + ExpectedBatchTy::OnlyBlock => { + trace!( + self.log, + "Sending backfill BlocksByRange Request"; + "method" => "BlocksByRange", + "count" => request.count, + "peer" => %peer_id, + ); + let request = Request::BlocksByRange(request); + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::BackFillSync { id }); + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request, + request_id, + })?; + self.backfill_requests.insert(id, batch_id); + Ok(id) + } + ExpectedBatchTy::OnlyBlockBlobs => { + debug!( + self.log, + "Sending BlockBlock by range request"; + "method" => "Mixed by range request", + "count" => request.count, + "peer" => %peer_id, + ); + + // create the shared request id. This is fine since the rpc handles substream ids. + let id = self.next_id(); + let request_id = RequestId::Sync(SyncRequestId::RangeSidecarPair { id }); + + // Create the blob request based on the blob request. + let blobs_request = Request::BlobsByRange(BlobsByRangeRequest { + start_slot: request.start_slot, + count: request.count, + }); + let blocks_request = Request::BlocksByRange(request); + + // Send both requests. Make sure both can be sent. + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blocks_request, + request_id, + })?; + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: blobs_request, + request_id, + })?; + let block_blob_info = BlockBlobRequestInfo::default(); + self.backfill_sidecar_pair_requests + .insert(id, (batch_id, block_blob_info)); + Ok(id) + } + } } /// Received a blocks by range response. - pub fn range_sync_response( + pub fn range_sync_block_response( &mut self, request_id: Id, - remove: bool, - ) -> Option<(ChainId, BatchId)> { - if remove { - self.range_requests.remove(&request_id) - } else { - self.range_requests.get(&request_id).cloned() + maybe_block: Option>>, + batch_type: ExpectedBatchTy, + ) -> Option<(ChainId, BatchId, Option>)> { + match batch_type { + ExpectedBatchTy::OnlyBlockBlobs => { + match self.range_sidecar_pair_requests.entry(request_id) { + Entry::Occupied(mut entry) => { + let (chain_id, batch_id, info) = entry.get_mut(); + let chain_id = chain_id.clone(); + let batch_id = batch_id.clone(); + info.add_block_response(maybe_block); + let maybe_block = info + .pop_response() + .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + if info.is_finished() { + entry.remove(); + } + Some((chain_id, batch_id, maybe_block)) + } + Entry::Vacant(_) => None, + } + } + ExpectedBatchTy::OnlyBlock => { + // if the request is just for blocks then it can be removed on a stream termination + match maybe_block { + Some(block) => { + self.range_requests + .get(&request_id) + .cloned() + .map(|(chain_id, batch_id)| { + (chain_id, batch_id, Some(BlockTy::Block { block })) + }) + } + None => self + .range_requests + .remove(&request_id) + .map(|(chain_id, batch_id)| (chain_id, batch_id, None)), + } + } } } - /// Fails a blob bob request. - // We need to recover the chain and batch id to be able to tell range abound the failure. - pub fn fail_block_bob_request(&mut self, request_id: Id) -> Option<(ChainId, BatchId)> { - self.block_blob_requests - .remove(&request_id) - .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)) + pub fn range_sync_sidecar_response( + &mut self, + request_id: Id, + maybe_sidecar: Option>>, + ) -> Option<(ChainId, BatchId, Option>)> { + match self.range_sidecar_pair_requests.entry(request_id) { + Entry::Occupied(mut entry) => { + let (chain_id, batch_id, info) = entry.get_mut(); + let chain_id = chain_id.clone(); + let batch_id = batch_id.clone(); + info.add_sidecar_response(maybe_sidecar); + let maybe_block = info + .pop_response() + .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + if info.is_finished() { + entry.remove(); + } + Some((chain_id, batch_id, maybe_block)) + } + Entry::Vacant(_) => None, + } } - /// We received a block for a block blob request. This returns: - /// None: if there is no pairing for this block yet - /// Some(chain_id, Some(paired block blob)) if the block was Some and there was a blob waiting - /// None if the block was none - pub fn block_blob_block_response( + pub fn range_sync_request_failed( &mut self, request_id: Id, - block: Option, - ) -> Option<(ChainId, BatchId, Option)> { - unimplemented!() - // let (chain_id, batch_id, info) = self.block_blob_requests.get_mut(&request_id)?; - // match block { - // Some(block) => match info.accumulated_blobs.pop_front() { - // Some(blob) => Some(SeansBlockBlob { block, blob }), - // None => { - // // accumulate the block - // info.accumulated_blocks.push_back(block); - // None - // } - // }, - // None => { - // info.is_blocks_rpc_finished = true; - // - // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { - // // this is the coupled stream termination - // Some((chain_id, batch_id, None)) - // } else { - // None - // } - // } - // } + batch_type: ExpectedBatchTy, + ) -> Option<(ChainId, BatchId)> { + match batch_type { + ExpectedBatchTy::OnlyBlockBlobs => self + .range_sidecar_pair_requests + .remove(&request_id) + .map(|(chain_id, batch_id, _info)| (chain_id, batch_id)), + ExpectedBatchTy::OnlyBlock => self.range_requests.remove(&request_id), + } } - pub fn block_blob_blob_response( + pub fn backfill_request_failed( &mut self, request_id: Id, - blob: Option, - ) -> Option<(ChainId, BatchId, Option)> { - // let (batch_id, chain_id, info) = self.block_blob_requests.get_mut(&request_id)?; - // match blob { - // Some(blob) => match info.accumulated_blocks.pop_front() { - // Some(block) => Some(SeansBlockBlob { block, blob }), - // None => { - // // accumulate the blob - // info.accumulated_blobs.push_back(blob); - // None - // } - // }, - // None => { - // info.is_blobs_rpc_finished = true; - // - // if info.is_blobs_rpc_finished && info.is_blocks_rpc_finished { - // // this is the coupled stream termination - // Some((chain_id, batch_id, None)) - // } else { - // None - // } - // } - // } - unimplemented!("do it") + batch_type: ExpectedBatchTy, + ) -> Option { + match batch_type { + ExpectedBatchTy::OnlyBlockBlobs => self + .backfill_sidecar_pair_requests + .remove(&request_id) + .map(|(batch_id, _info)| batch_id), + ExpectedBatchTy::OnlyBlock => self.backfill_requests.remove(&request_id), + } } /// Received a blocks by range response. - pub fn backfill_sync_response(&mut self, request_id: Id, remove: bool) -> Option { - if remove { - self.backfill_requests.remove(&request_id) - } else { - self.backfill_requests.get(&request_id).cloned() + pub fn backfill_sync_block_response( + &mut self, + request_id: Id, + maybe_block: Option>>, + batch_type: ExpectedBatchTy, + ) -> Option<(BatchId, Option>)> { + match batch_type { + ExpectedBatchTy::OnlyBlockBlobs => { + match self.backfill_sidecar_pair_requests.entry(request_id) { + Entry::Occupied(mut entry) => { + let (batch_id, info) = entry.get_mut(); + let batch_id = batch_id.clone(); + info.add_block_response(maybe_block); + let maybe_block = info + .pop_response() + .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + if info.is_finished() { + entry.remove(); + } + Some((batch_id, maybe_block)) + } + Entry::Vacant(_) => None, + } + } + ExpectedBatchTy::OnlyBlock => { + // if the request is just for blocks then it can be removed on a stream termination + match maybe_block { + Some(block) => self + .backfill_requests + .get(&request_id) + .cloned() + .map(|batch_id| (batch_id, Some(BlockTy::Block { block }))), + None => self + .backfill_requests + .remove(&request_id) + .map(|batch_id| (batch_id, None)), + } + } + } + } + + pub fn backfill_sync_sidecar_response( + &mut self, + request_id: Id, + maybe_sidecar: Option>>, + ) -> Option<(BatchId, Option>)> { + match self.backfill_sidecar_pair_requests.entry(request_id) { + Entry::Occupied(mut entry) => { + let (batch_id, info) = entry.get_mut(); + let batch_id = batch_id.clone(); + info.add_sidecar_response(maybe_sidecar); + let maybe_block = info + .pop_response() + .map(|block_sidecar_pair| BlockTy::BlockAndBlob { block_sidecar_pair }); + if info.is_finished() { + entry.remove(); + } + Some((batch_id, maybe_block)) + } + Entry::Vacant(_) => None, } } @@ -316,6 +459,7 @@ impl SyncNetworkContext { request: BlocksByRootRequest, ) -> Result { //FIXME(sean) add prune depth logic here? + // D: YES trace!( self.log, @@ -428,4 +572,29 @@ impl SyncNetworkContext { self.request_id += 1; id } + + pub fn batch_type(&self, epoch: types::Epoch) -> ExpectedBatchTy { + // Keep tests only for blocks. + #[cfg(test)] + { + return ExpectedBatchTy::OnlyBlock; + } + #[cfg(not(test))] + { + use super::range_sync::EPOCHS_PER_BATCH; + assert_eq!( + EPOCHS_PER_BATCH, 1, + "If this is not one, everything will fail horribly" + ); + warn!( + self.log, + "Missing fork boundary and prunning boundary comparison to decide request type. EVERYTHING IS A BLOB, BOB." + ); + // Here we need access to the beacon chain, check the fork boundary, the current epoch, the + // blob period to serve and check with that if the batch is a blob batch or not. + // NOTE: This would carelessly assume batch sizes are always 1 epoch, to avoid needing to + // align with the batch boundary. + ExpectedBatchTy::OnlyBlockBlobs + } + } } diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 3eee7223db6..80819d57e6b 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,11 +1,11 @@ -use crate::sync::manager::Id; +use crate::sync::manager::{BlockTy, Id}; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::PeerId; use std::collections::HashSet; use std::hash::{Hash, Hasher}; use std::ops::Sub; use std::sync::Arc; -use types::{Epoch, EthSpec, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar, Slot}; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; @@ -14,6 +14,22 @@ const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 5; /// after `MAX_BATCH_PROCESSING_ATTEMPTS` times, it is considered faulty. const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; +pub enum BatchTy { + Blocks(Vec>>), + BlocksAndBlobs(Vec>), +} + +/// Error representing a batch with mixed block types. +#[derive(Debug)] +pub struct MixedBlockTyErr; + +/// Type of expected batch. +#[derive(Debug, Clone)] +pub enum ExpectedBatchTy { + OnlyBlockBlobs, + OnlyBlock, +} + /// Allows customisation of the above constants used in other sync methods such as BackFillSync. pub trait BatchConfig { /// The maximum batch download attempts. @@ -47,7 +63,7 @@ pub trait BatchConfig { /// Note that simpler hashing functions considered in the past (hash of first block, hash of last /// block, number of received blocks) are not good enough to differentiate attempts. For this /// reason, we hash the complete set of blocks both in RangeSync and BackFillSync. - fn batch_attempt_hash(blocks: &[Arc>]) -> u64; + fn batch_attempt_hash(blocks: &[BlockTy]) -> u64; } pub struct RangeSyncBatchConfig {} @@ -59,7 +75,7 @@ impl BatchConfig for RangeSyncBatchConfig { fn max_batch_processing_attempts() -> u8 { MAX_BATCH_PROCESSING_ATTEMPTS } - fn batch_attempt_hash(blocks: &[Arc>]) -> u64 { + fn batch_attempt_hash(blocks: &[BlockTy]) -> u64 { let mut hasher = std::collections::hash_map::DefaultHasher::new(); blocks.hash(&mut hasher); hasher.finish() @@ -96,6 +112,8 @@ pub struct BatchInfo { failed_download_attempts: Vec, /// State of the batch. state: BatchState, + /// Whether this batch contains all blocks or all blocks and blobs. + batch_type: ExpectedBatchTy, /// Pin the generic marker: std::marker::PhantomData, } @@ -105,9 +123,9 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(PeerId, Vec>>, Id), + Downloading(PeerId, Vec>, Id), /// The batch has been completely downloaded and is ready for processing. - AwaitingProcessing(PeerId, Vec>>), + AwaitingProcessing(PeerId, Vec>), /// The batch is being processed. Processing(Attempt), /// The batch was successfully processed and is waiting to be validated. @@ -139,8 +157,13 @@ impl BatchInfo { /// Epoch boundary | | /// ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 | /// Batch 1 | Batch 2 | Batch 3 - pub fn new(start_epoch: &Epoch, num_of_epochs: u64) -> Self { - let start_slot = start_epoch.start_slot(T::slots_per_epoch()) + 1; + /// + /// NOTE: Removed the shift by one for eip4844 because otherwise the last batch before the blob + /// fork boundary will be of mixed type (all blocks and one last blockblob), and I don't want to + /// deal with this for now. + /// This means finalization might be slower in eip4844 + pub fn new(start_epoch: &Epoch, num_of_epochs: u64, batch_type: ExpectedBatchTy) -> Self { + let start_slot = start_epoch.start_slot(T::slots_per_epoch()); let end_slot = start_slot + num_of_epochs * T::slots_per_epoch(); BatchInfo { start_slot, @@ -149,6 +172,7 @@ impl BatchInfo { failed_download_attempts: Vec::new(), non_faulty_processing_attempts: 0, state: BatchState::AwaitingDownload, + batch_type, marker: std::marker::PhantomData, } } @@ -201,11 +225,14 @@ impl BatchInfo { } /// Returns a BlocksByRange request associated with the batch. - pub fn to_blocks_by_range_request(&self) -> BlocksByRangeRequest { - BlocksByRangeRequest { - start_slot: self.start_slot.into(), - count: self.end_slot.sub(self.start_slot).into(), - } + pub fn to_blocks_by_range_request(&self) -> (BlocksByRangeRequest, ExpectedBatchTy) { + ( + BlocksByRangeRequest { + start_slot: self.start_slot.into(), + count: self.end_slot.sub(self.start_slot).into(), + }, + self.batch_type.clone(), + ) } /// After different operations over a batch, this could be in a state that allows it to @@ -231,7 +258,7 @@ impl BatchInfo { } /// Adds a block to a downloading batch. - pub fn add_block(&mut self, block: Arc>) -> Result<(), WrongState> { + pub fn add_block(&mut self, block: BlockTy) -> Result<(), WrongState> { match self.state.poison() { BatchState::Downloading(peer, mut blocks, req_id) => { blocks.push(block); @@ -363,11 +390,30 @@ impl BatchInfo { } } - pub fn start_processing(&mut self) -> Result>>, WrongState> { + pub fn start_processing(&mut self) -> Result, WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, blocks) => { self.state = BatchState::Processing(Attempt::new::(peer, &blocks)); - Ok(blocks) + match self.batch_type { + ExpectedBatchTy::OnlyBlockBlobs => { + let blocks = blocks.into_iter().map(|block| { + let BlockTy::BlockAndBlob { block_sidecar_pair: block_and_blob } = block else { + panic!("Batches should never have a mixed type. This is a bug. Contact D") + }; + block_and_blob + }).collect(); + Ok(BatchTy::BlocksAndBlobs(blocks)) + } + ExpectedBatchTy::OnlyBlock => { + let blocks = blocks.into_iter().map(|block| { + let BlockTy::Block { block } = block else { + panic!("Batches should never have a mixed type. This is a bug. Contact D") + }; + block + }).collect(); + Ok(BatchTy::Blocks(blocks)) + } + } } BatchState::Poisoned => unreachable!("Poisoned batch"), other => { @@ -461,10 +507,7 @@ pub struct Attempt { } impl Attempt { - fn new( - peer_id: PeerId, - blocks: &[Arc>], - ) -> Self { + fn new(peer_id: PeerId, blocks: &[BlockTy]) -> Self { let hash = B::batch_attempt_hash(blocks); Attempt { peer_id, hash } } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 4226b600f5b..09e5bf263a4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,5 +1,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState}; +use super::BatchTy; use crate::beacon_processor::{ChainSegmentProcessId, WorkEvent as BeaconWorkEvent}; +use crate::sync::manager::BlockTy; use crate::sync::{ manager::Id, network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult, }; @@ -10,8 +12,7 @@ use rand::seq::SliceRandom; use slog::{crit, debug, o, warn}; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::hash::{Hash, Hasher}; -use std::sync::Arc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -19,7 +20,7 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /// we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which /// case the responder will fill the response up to the max request size, assuming they have the /// bandwidth to do so. -pub const EPOCHS_PER_BATCH: u64 = 2; +pub const EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. const BATCH_BUFFER_SIZE: u8 = 5; @@ -225,7 +226,7 @@ impl SyncingChain { batch_id: BatchId, peer_id: &PeerId, request_id: Id, - beacon_block: Option>>, + beacon_block: Option>, ) -> ProcessingResult { // check if we have this batch let batch = match self.batches.get_mut(&batch_id) { @@ -326,9 +327,14 @@ impl SyncingChain { let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id, count_unrealized); self.current_processing_batch = Some(batch_id); - if let Err(e) = - beacon_processor_send.try_send(BeaconWorkEvent::chain_segment(process_id, blocks)) - { + let work_event = match blocks { + BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks), + BatchTy::BlocksAndBlobs(blocks_and_blobs) => { + BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs) + } + }; + + if let Err(e) = beacon_processor_send.try_send(work_event) { crit!(self.log, "Failed to send chain segment to processor."; "msg" => "process_batch", "error" => %e, "batch" => self.processing_target); // This is unlikely to happen but it would stall syncing since the batch now has no @@ -897,8 +903,8 @@ impl SyncingChain { peer: PeerId, ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { - let request = batch.to_blocks_by_range_request(); - match network.blocks_by_range_request(peer, request, self.id, batch_id) { + let (request, batch_type) = batch.to_blocks_by_range_request(); + match network.blocks_by_range_request(peer, batch_type, request, self.id, batch_id) { Ok(request_id) => { // inform the batch about the new request batch.start_downloading_from_peer(peer, request_id)?; @@ -1002,7 +1008,8 @@ impl SyncingChain { if let Some(epoch) = self.optimistic_start { if let Entry::Vacant(entry) = self.batches.entry(epoch) { if let Some(peer) = idle_peers.pop() { - let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH); + let batch_type = network.batch_type(epoch); + let optimistic_batch = BatchInfo::new(&epoch, EPOCHS_PER_BATCH, batch_type); entry.insert(optimistic_batch); self.send_batch(network, epoch, peer)?; } @@ -1011,7 +1018,7 @@ impl SyncingChain { } while let Some(peer) = idle_peers.pop() { - if let Some(batch_id) = self.include_next_batch() { + if let Some(batch_id) = self.include_next_batch(network) { // send the batch self.send_batch(network, batch_id, peer)?; } else { @@ -1025,7 +1032,7 @@ impl SyncingChain { /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. - fn include_next_batch(&mut self) -> Option { + fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond the target head slot if self .to_be_downloaded @@ -1059,10 +1066,11 @@ impl SyncingChain { Entry::Occupied(_) => { // this batch doesn't need downloading, let this same function decide the next batch self.to_be_downloaded += EPOCHS_PER_BATCH; - self.include_next_batch() + self.include_next_batch(network) } Entry::Vacant(entry) => { - entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH)); + let batch_type = network.batch_type(batch_id); + entry.insert(BatchInfo::new(&batch_id, EPOCHS_PER_BATCH, batch_type)); self.to_be_downloaded += EPOCHS_PER_BATCH; Some(batch_id) } diff --git a/beacon_node/network/src/sync/range_sync/mod.rs b/beacon_node/network/src/sync/range_sync/mod.rs index f4db32bc96b..28426032191 100644 --- a/beacon_node/network/src/sync/range_sync/mod.rs +++ b/beacon_node/network/src/sync/range_sync/mod.rs @@ -8,7 +8,10 @@ mod chain_collection; mod range; mod sync_type; -pub use batch::{BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState}; +pub use batch::{ + BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, BatchTy, + ExpectedBatchTy, +}; pub use chain::{BatchId, ChainId, EPOCHS_PER_BATCH}; pub use range::RangeSync; pub use sync_type::RangeSyncType; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 25314543877..b28757bc09d 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -44,7 +44,7 @@ use super::chain::{BatchId, ChainId, RemoveChain, SyncingChain}; use super::chain_collection::ChainCollection; use super::sync_type::RangeSyncType; use crate::status::ToStatusMessage; -use crate::sync::manager::Id; +use crate::sync::manager::{BlockTy, Id}; use crate::sync::network_context::SyncNetworkContext; use crate::sync::BatchProcessResult; use beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -55,7 +55,7 @@ use lru_cache::LRUTimeCache; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::sync::Arc; -use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; /// For how long we store failed finalized chains to prevent retries. const FAILED_CHAINS_EXPIRY_SECONDS: u64 = 30; @@ -202,7 +202,7 @@ where chain_id: ChainId, batch_id: BatchId, request_id: Id, - beacon_block: Option>>, + beacon_block: Option>, ) { // check if this chunk removes the chain match self.chains.call_by_id(chain_id, |chain| { @@ -372,6 +372,7 @@ where #[cfg(test)] mod tests { use crate::service::RequestId; + use crate::sync::range_sync::ExpectedBatchTy; use crate::NetworkMessage; use super::*; @@ -682,10 +683,13 @@ mod tests { // add some peers let (peer1, local_info, head_info) = rig.head_peer(); range.add_peer(&mut rig.cx, local_info, peer1, head_info); - let ((chain1, batch1), id1) = match rig.grab_request(&peer1).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { - (rig.cx.range_sync_response(id, true).unwrap(), id) - } + let ((chain1, batch1, _), id1) = match rig.grab_request(&peer1).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => ( + rig.cx + .range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock) + .unwrap(), + id, + ), other => panic!("unexpected request {:?}", other), }; @@ -701,10 +705,13 @@ mod tests { // while the ee is offline, more peers might arrive. Add a new finalized peer. let (peer2, local_info, finalized_info) = rig.finalized_peer(); range.add_peer(&mut rig.cx, local_info, peer2, finalized_info); - let ((chain2, batch2), id2) = match rig.grab_request(&peer2).0 { - RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => { - (rig.cx.range_sync_response(id, true).unwrap(), id) - } + let ((chain2, batch2, _), id2) = match rig.grab_request(&peer2).0 { + RequestId::Sync(crate::sync::manager::RequestId::RangeSync { id }) => ( + rig.cx + .range_sync_block_response(id, None, ExpectedBatchTy::OnlyBlock) + .unwrap(), + id, + ), other => panic!("unexpected request {:?}", other), };