Skip to content

Commit

Permalink
Blob syncing (#24)
Browse files Browse the repository at this point in the history
* add a rt is_blob_batch

* use the mixed type everywhere

* glue

* more glue

* minor fixes

* fix range tests

* filling in the gaps

* moore filling in the gaps
  • Loading branch information
divagant-martian authored Nov 24, 2022
1 parent ce097ac commit bf50052
Show file tree
Hide file tree
Showing 10 changed files with 696 additions and 270 deletions.
40 changes: 40 additions & 0 deletions beacon_node/network/src/beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024;
/// be stored before we start dropping them.
const MAX_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;

/// The maximum number of queued `Vec<[`SignedBeaconBlockAndBlobsSidecar`]>` objects received during syncing that will
/// be stored before we start dropping them.
const MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN: usize = 64;

/// The maximum number of queued `StatusMessage` objects received from the network RPC that will be
/// stored before we start dropping them.
const MAX_STATUS_QUEUE_LEN: usize = 1_024;
Expand Down Expand Up @@ -206,6 +210,7 @@ pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request";
pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request";
pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const BLOB_CHAIN_SEGMENT: &str = "blob_chain_segment";

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
Expand Down Expand Up @@ -546,6 +551,19 @@ impl<T: BeaconChainTypes> WorkEvent<T> {
}
}

pub fn blob_chain_segment(
process_id: ChainSegmentProcessId,
blocks_and_blobs: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) -> Self {
Self {
drop_during_sync: false,
work: Work::BlobChainSegment {
process_id,
blocks_and_blobs,
},
}
}

/// Create a new work event to process `StatusMessage`s from the RPC network.
pub fn status_message(peer_id: PeerId, message: StatusMessage) -> Self {
Self {
Expand Down Expand Up @@ -809,6 +827,10 @@ pub enum Work<T: BeaconChainTypes> {
request_id: PeerRequestId,
request: BlobsByRootRequest,
},
BlobChainSegment {
process_id: ChainSegmentProcessId,
blocks_and_blobs: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
},
}

impl<T: BeaconChainTypes> Work<T> {
Expand Down Expand Up @@ -836,6 +858,7 @@ impl<T: BeaconChainTypes> Work<T> {
Work::BlobsByRootsRequest { .. } => BLOBS_BY_ROOTS_REQUEST,
Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION,
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::BlobChainSegment { .. } => BLOB_CHAIN_SEGMENT,
}
}
}
Expand Down Expand Up @@ -971,6 +994,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN);
let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN);
let mut blob_chain_segment_queue = FifoQueue::new(MAX_BLOB_CHAIN_SEGMENT_QUEUE_LEN);
let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN);
let mut gossip_block_and_blobs_sidecar_queue =
FifoQueue::new(MAX_GOSSIP_BLOCK_AND_BLOB_QUEUE_LEN);
Expand Down Expand Up @@ -1072,6 +1096,11 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
self.spawn_worker(item, toolbox);
// Check sync blocks before gossip blocks, since we've already explicitly
// requested these blocks.
} else if let Some(item) = blob_chain_segment_queue.pop() {
self.spawn_worker(item, toolbox);
// Sync block and blob segments have the same priority as normal chain
// segments. This here might change depending on how batch processing
// evolves.
} else if let Some(item) = rpc_block_queue.pop() {
self.spawn_worker(item, toolbox);
// Check delayed blocks before gossip blocks, the gossip blocks might rely
Expand Down Expand Up @@ -1339,6 +1368,9 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
request_id,
request,
} => todo!(),
Work::BlobChainSegment { .. } => {
blob_chain_segment_queue.push(work, work_id, &self.log)
}
}
}
}
Expand Down Expand Up @@ -1775,6 +1807,14 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
seen_timestamp,
)
}),
Work::BlobChainSegment {
process_id,
blocks_and_blobs,
} => task_spawner.spawn_async(async move {
worker
.process_blob_chain_segment(process_id, blocks_and_blobs)
.await
}),
};
}
}
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/network/src/beacon_processor/worker/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -241,6 +241,17 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}

pub async fn process_blob_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) {
warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!");
let result = BatchProcessResult::Success {
was_non_empty: !downloaded_blocks.is_empty(),
};
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>(
&self,
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/network/src/router/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id,
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
id @ (SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::BackFillSidecarPair { .. }
| SyncId::RangeSidecarPair { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
Expand Down Expand Up @@ -266,11 +268,12 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}

SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
Expand Down Expand Up @@ -298,11 +301,12 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}

SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
Expand Down
37 changes: 24 additions & 13 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use std::collections::{
HashMap, HashSet,
};
use std::sync::Arc;
use types::{Epoch, EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec};

use super::manager::BlockTy;
use super::range_sync::BatchTy;

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -54,7 +57,7 @@ impl BatchConfig for BackFillBatchConfig {
fn max_batch_processing_attempts() -> u8 {
MAX_BATCH_PROCESSING_ATTEMPTS
}
fn batch_attempt_hash<T: EthSpec>(blocks: &[Arc<SignedBeaconBlock<T>>]) -> u64 {
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
Expand Down Expand Up @@ -390,7 +393,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
beacon_block: Option<BlockTy<T::EthSpec>>,
) -> Result<ProcessResult, BackFillError> {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
Expand Down Expand Up @@ -535,10 +538,13 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id);

if let Err(e) = network
.processor_channel()
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
let work_event = match blocks {
BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks),
BatchTy::BlocksAndBlobs(blocks_and_blobs) => {
BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs)
}
};
if let Err(e) = network.processor_channel().try_send(work_event) {
crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target);
// This is unlikely to happen but it would stall syncing since the batch now has no
Expand Down Expand Up @@ -953,8 +959,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
peer: PeerId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, request, batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) {
Ok(request_id) => {
// inform the batch about the new request
if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {
Expand Down Expand Up @@ -1054,7 +1060,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
idle_peers.shuffle(&mut rng);

while let Some(peer) = idle_peers.pop() {
if let Some(batch_id) = self.include_next_batch() {
if let Some(batch_id) = self.include_next_batch(network) {
// send the batch
self.send_batch(network, batch_id, peer)?;
} else {
Expand All @@ -1067,7 +1073,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self) -> Option<BatchId> {
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
// don't request batches beyond genesis;
if self.last_batch_downloaded {
return None;
Expand Down Expand Up @@ -1104,10 +1110,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.to_be_downloaded = self
.to_be_downloaded
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
self.include_next_batch()
self.include_next_batch(network)
}
Entry::Vacant(entry) => {
entry.insert(BatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH));
let batch_type = network.batch_type(batch_id);
entry.insert(BatchInfo::new(
&batch_id,
BACKFILL_EPOCHS_PER_BATCH,
batch_type,
));
if batch_id == 0 {
self.last_batch_downloaded = true;
}
Expand Down
Loading

0 comments on commit bf50052

Please sign in to comment.