Skip to content

Commit

Permalink
Blob syncing (sigp#24)
Browse files Browse the repository at this point in the history
* add a rt is_blob_batch

* use the mixed type everywhere

* glue

* more glue

* minor fixes

* fix range tests

* filling in the gaps

* moore filling in the gaps
  • Loading branch information
divagant-martian authored and Woodpile37 committed Jan 6, 2024
1 parent f21d48c commit 48617a8
Show file tree
Hide file tree
Showing 9 changed files with 656 additions and 270 deletions.
13 changes: 12 additions & 1 deletion beacon_node/network/src/beacon_processor/worker/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use lighthouse_network::PeerAction;
use slog::{debug, error, info, warn};
use std::sync::Arc;
use tokio::sync::mpsc;
use types::{Epoch, Hash256, SignedBeaconBlock};
use types::{Epoch, Hash256, SignedBeaconBlock, SignedBeaconBlockAndBlobsSidecar};

/// Id associated to a batch processing request, either a sync batch or a parent lookup.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -241,6 +241,17 @@ impl<T: BeaconChainTypes> Worker<T> {
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}

pub async fn process_blob_chain_segment(
&self,
sync_type: ChainSegmentProcessId,
downloaded_blocks: Vec<SignedBeaconBlockAndBlobsSidecar<T::EthSpec>>,
) {
warn!(self.log, "FAKE PROCESSING A BLOBS SEGMENT!!!");
let result = BatchProcessResult::Success {
was_non_empty: !downloaded_blocks.is_empty(),
};
self.send_sync_message(SyncMessage::BatchProcessed { sync_type, result });
}
/// Helper function to process blocks batches which only consumes the chain and blocks to process.
async fn process_blocks<'a>(
&self,
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/network/src/router/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ impl<T: BeaconChainTypes> Processor<T> {
SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. } => {
unreachable!("Block lookups do not request BBRange requests")
}
id @ (SyncId::BackFillSync { .. } | SyncId::RangeSync { .. }) => id,
SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
id @ (SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::BackFillSidecarPair { .. }
| SyncId::RangeSidecarPair { .. }) => id,
},
RequestId::Router => unreachable!("All BBRange requests belong to sync"),
};
Expand Down Expand Up @@ -243,11 +245,12 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
}

SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
Expand Down Expand Up @@ -275,11 +278,12 @@ impl<T: BeaconChainTypes> Processor<T> {
let request_id = match request_id {
RequestId::Sync(sync_id) => match sync_id {
id @ (SyncId::SingleBlock { .. } | SyncId::ParentLookup { .. }) => id,
SyncId::BackFillSync { .. } | SyncId::RangeSync { .. } => {
unreachable!("Batch syncing do not request BBRoot requests")
SyncId::BackFillSync { .. }
| SyncId::RangeSync { .. }
| SyncId::RangeSidecarPair { .. }
| SyncId::BackFillSidecarPair { .. } => {
unreachable!("Batch syncing does not request BBRoot requests")
}

SyncId::RangeBlockBlob { id } => unimplemented!("do it"),
},
RequestId::Router => unreachable!("All BBRoot requests belong to sync"),
};
Expand Down
37 changes: 24 additions & 13 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use std::collections::{
HashMap, HashSet,
};
use std::sync::Arc;
use types::{Epoch, EthSpec, SignedBeaconBlock};
use types::{Epoch, EthSpec};

use super::manager::BlockTy;
use super::range_sync::BatchTy;

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -54,7 +57,7 @@ impl BatchConfig for BackFillBatchConfig {
fn max_batch_processing_attempts() -> u8 {
MAX_BATCH_PROCESSING_ATTEMPTS
}
fn batch_attempt_hash<T: EthSpec>(blocks: &[Arc<SignedBeaconBlock<T>>]) -> u64 {
fn batch_attempt_hash<T: EthSpec>(blocks: &[BlockTy<T>]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
Expand Down Expand Up @@ -390,7 +393,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
beacon_block: Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
beacon_block: Option<BlockTy<T::EthSpec>>,
) -> Result<ProcessResult, BackFillError> {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
Expand Down Expand Up @@ -535,10 +538,13 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
let process_id = ChainSegmentProcessId::BackSyncBatchId(batch_id);
self.current_processing_batch = Some(batch_id);

if let Err(e) = network
.processor_channel()
.try_send(BeaconWorkEvent::chain_segment(process_id, blocks))
{
let work_event = match blocks {
BatchTy::Blocks(blocks) => BeaconWorkEvent::chain_segment(process_id, blocks),
BatchTy::BlocksAndBlobs(blocks_and_blobs) => {
BeaconWorkEvent::blob_chain_segment(process_id, blocks_and_blobs)
}
};
if let Err(e) = network.processor_channel().try_send(work_event) {
crit!(self.log, "Failed to send backfill segment to processor."; "msg" => "process_batch",
"error" => %e, "batch" => self.processing_target);
// This is unlikely to happen but it would stall syncing since the batch now has no
Expand Down Expand Up @@ -953,8 +959,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
peer: PeerId,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
let request = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, request, batch_id) {
let (request, is_blob_batch) = batch.to_blocks_by_range_request();
match network.backfill_blocks_by_range_request(peer, is_blob_batch, request, batch_id) {
Ok(request_id) => {
// inform the batch about the new request
if let Err(e) = batch.start_downloading_from_peer(peer, request_id) {
Expand Down Expand Up @@ -1054,7 +1060,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
idle_peers.shuffle(&mut rng);

while let Some(peer) = idle_peers.pop() {
if let Some(batch_id) = self.include_next_batch() {
if let Some(batch_id) = self.include_next_batch(network) {
// send the batch
self.send_batch(network, batch_id, peer)?;
} else {
Expand All @@ -1067,7 +1073,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {

/// Creates the next required batch from the chain. If there are no more batches required,
/// `false` is returned.
fn include_next_batch(&mut self) -> Option<BatchId> {
fn include_next_batch(&mut self, network: &mut SyncNetworkContext<T>) -> Option<BatchId> {
// don't request batches beyond genesis;
if self.last_batch_downloaded {
return None;
Expand Down Expand Up @@ -1104,10 +1110,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
self.to_be_downloaded = self
.to_be_downloaded
.saturating_sub(BACKFILL_EPOCHS_PER_BATCH);
self.include_next_batch()
self.include_next_batch(network)
}
Entry::Vacant(entry) => {
entry.insert(BatchInfo::new(&batch_id, BACKFILL_EPOCHS_PER_BATCH));
let batch_type = network.batch_type(batch_id);
entry.insert(BatchInfo::new(
&batch_id,
BACKFILL_EPOCHS_PER_BATCH,
batch_type,
));
if batch_id == 0 {
self.last_batch_downloaded = true;
}
Expand Down
Loading

0 comments on commit 48617a8

Please sign in to comment.