diff --git a/fork_choice_control/src/mutator.rs b/fork_choice_control/src/mutator.rs index bfb9be8..2ac87e0 100644 --- a/fork_choice_control/src/mutator.rs +++ b/fork_choice_control/src/mutator.rs @@ -520,11 +520,10 @@ where { let blob_count = post_deneb_block_body.blob_kzg_commitments().len(); - // check if it is supernode, and maintaining columns more than half + // check if it is supernode, and obtaining columns more than half if self.store.is_supernode() && available_columns.len() > NumberOfColumns::USIZE / 2 { - // TODO(feature/das): reconstruct the missing columns here if !self .store .has_reconstructed_data_column_sidecars(block_root) diff --git a/p2p/src/block_sync_service.rs b/p2p/src/block_sync_service.rs index deb373d..aa4480f 100644 --- a/p2p/src/block_sync_service.rs +++ b/p2p/src/block_sync_service.rs @@ -394,12 +394,16 @@ impl BlockSyncService

{ } pub fn retry_sync_batches(&mut self, batches: Vec) -> Result<()> { - let new_batches = batches.into_iter() + let new_batches = batches + .into_iter() .filter_map(|b| { match b.target { - SyncTarget::DataColumnSidecar(columns) => - Some(self.sync_manager - .map_peer_custody_columns(&columns, None, Some(b.peer_id)) + // TODO(feature/das): we should reconstruct the batch by: + // - [ ] filter out the columns that are already received or accepted, + // - [x] filter out peer that are their head slot is less than start slot + SyncTarget::DataColumnSidecar(columns) => Some( + self.sync_manager + .map_peer_custody_columns(&columns, b.start_slot, None, Some(b.peer_id)) .into_iter() .map(|(peer_id, peer_columns)| SyncBatch { target: SyncTarget::DataColumnSidecar(peer_columns), @@ -408,18 +412,20 @@ impl BlockSyncService

{ start_slot: b.start_slot, count: b.count, }) - .collect_vec() - ), - SyncTarget::Block | SyncTarget::BlobSidecar => - self.sync_manager - .random_peer() - .map(|peer_id| vec![SyncBatch { + .collect_vec(), + ), + SyncTarget::Block | SyncTarget::BlobSidecar => self + .sync_manager + .random_peer_with_head_slot_filtered(b.start_slot) + .map(|peer_id| { + vec![SyncBatch { target: b.target, direction: b.direction, peer_id, start_slot: b.start_slot, count: b.count, - }]) + }] + }), } }) .flatten() @@ -558,8 +564,14 @@ impl BlockSyncService

{ self.sync_manager .add_data_columns_request_by_range(request_id, batch, &columns); - SyncToP2p::RequestDataColumnsByRange(request_id, peer_id, start_slot, count, columns.to_vec()) - .send(&self.sync_to_p2p_tx); + SyncToP2p::RequestDataColumnsByRange( + request_id, + peer_id, + start_slot, + count, + columns.to_vec(), + ) + .send(&self.sync_to_p2p_tx); } SyncTarget::BlobSidecar => { self.sync_manager @@ -599,24 +611,28 @@ impl BlockSyncService

{ ); return Ok(()); } - - let first_id = identifiers.first().expect("must request at least 1 data column sidecar"); + + let first_id = identifiers + .first() + .expect("must request at least 1 data column sidecar"); let columns_indices = identifiers.iter().map(|id| id.index).collect(); - let peer_custody_columns_mapping = self.sync_manager.map_peer_custody_columns(&columns_indices, peer_id, None); + let peer_custody_columns_mapping = + self.sync_manager + .map_peer_custody_columns(&columns_indices, slot, peer_id, None); // let request_peers = peer_custody_columns_mapping.keys(); // let num_of_requests = peer_custody_columns_mapping.len(); - // TODO(feature/das): fetch corresponding peers to the custody columns needed for (peer_id, columns) in peer_custody_columns_mapping.into_iter() { - if !columns.is_empty() { - - let request_id = self.request_id()?; - - let peer_custody_columns = columns - .into_iter() - .map(|index| DataColumnIdentifier { index, block_root: first_id.block_root }) - .collect::>(); + let request_id = self.request_id()?; + + let peer_custody_columns = columns + .into_iter() + .map(|index| DataColumnIdentifier { + index, + block_root: first_id.block_root, + }) + .collect::>(); let data_column_identifiers = self .sync_manager .add_data_columns_request_by_root(peer_custody_columns, peer_id); @@ -652,7 +668,9 @@ impl BlockSyncService

{ let request_id = self.request_id()?; - let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer()) else { + let Some(peer_id) = + peer_id.or_else(|| self.sync_manager.random_peer_with_head_slot_filtered(slot)) + else { return Ok(()); }; diff --git a/p2p/src/network.rs b/p2p/src/network.rs index 8eb39c3..0885ba7 100644 --- a/p2p/src/network.rs +++ b/p2p/src/network.rs @@ -1650,7 +1650,7 @@ impl Network

{ (request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_identifier:?})", ), ); - + self.log( Level::Debug, format_args!( @@ -2207,37 +2207,24 @@ impl Network

{ count: u64, peer_custody_columns: Vec, ) { - let epoch = misc::compute_epoch_at_slot::

(start_slot); - - // prevent node from sending excessive requests, since custody peers is not available. - if self.check_good_peers_on_column_subnets(epoch) { - // TODO: is count capped in eth2_libp2p? - let request = DataColumnsByRangeRequest { - start_slot, - count, - columns: Arc::new( - ContiguousList::try_from(peer_custody_columns) - .expect("fail to parse custody_columns"), - ), - }; + // TODO: is count capped in eth2_libp2p? + let request = DataColumnsByRangeRequest { + start_slot, + count, + columns: Arc::new( + ContiguousList::try_from(peer_custody_columns) + .expect("fail to parse custody_columns"), + ), + }; - self.log( - Level::Info, - format_args!( - "sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})", - ), - ); + self.log( + Level::Debug, + format_args!( + "sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})", + ), + ); - self.request(peer_id, request_id, Request::DataColumnsByRange(request)); - } else { - self.log( - Level::Info, - format_args!( - "Waiting for peers to be available on custody_columns: [{}]", - peer_custody_columns.iter().join(", "), - ), - ); - } + self.request(peer_id, request_id, Request::DataColumnsByRange(request)); } fn request_data_columns_by_root( diff --git a/p2p/src/sync_manager.rs b/p2p/src/sync_manager.rs index af14c7b..c27744b 100644 --- a/p2p/src/sync_manager.rs +++ b/p2p/src/sync_manager.rs @@ -6,7 +6,7 @@ use arithmetic::NonZeroExt as _; use cached::{Cached as _, TimedSizedCache}; use eth2_libp2p::{rpc::StatusMessage, NetworkGlobals, PeerId}; use helper_functions::misc; -use itertools::Itertools as _; +use itertools::Itertools; use log::{log, Level}; use prometheus_metrics::Metrics; use rand::{prelude::SliceRandom, seq::IteratorRandom as _, thread_rng}; @@ -95,7 +95,7 @@ impl SyncManager { ), not_enough_peers_message_shown_at: None, network_globals, - } + } } #[must_use] @@ -127,7 +127,7 @@ impl SyncManager { } //pub fn retry_batch(&mut self, request_id: RequestId, batch: &SyncBatch) -> Option { - // // TODO(feature/das): peer should be not randomized for data columns request + // // TODO(feature/das): peer should be not randomized for data columns request // let peer = self.random_peer(); // // self.log_with_feature(format_args!( @@ -318,6 +318,14 @@ impl SyncManager { max_slot = start_slot + count - 1; + sync_batches.push(SyncBatch { + target: SyncTarget::Block, + direction: SyncDirection::Forward, + peer_id, + start_slot, + count, + }); + // TODO(feature/das): check if there any blobs in the slot range // or request blocks_by_range first, then check blobs availability once received each // block, queue them, and request data_column_sidecars_by_range/blob_sidecars_by_range @@ -328,7 +336,12 @@ impl SyncManager { misc::data_column_serve_range_slot::

(config, current_slot); if data_column_serve_range_slot < max_slot { let custody_columns = self.network_globals.custody_columns(); - let peer_custody_columns_mapping = self.map_peer_custody_columns(&custody_columns, Some(peer_id), None); + let peer_custody_columns_mapping = self.map_peer_custody_columns( + &custody_columns, + start_slot, + Some(peer_id), + None, + ); for (peer_id, columns) in peer_custody_columns_mapping { sync_batches.push(SyncBatch { @@ -352,14 +365,6 @@ impl SyncManager { }); } } - - sync_batches.push(SyncBatch { - target: SyncTarget::Block, - direction: SyncDirection::Forward, - peer_id, - start_slot, - count, - }); } self.log_with_feature(format_args!( @@ -385,7 +390,12 @@ impl SyncManager { .ready_to_request_by_root(&block_root, peer_id) } - pub fn add_data_columns_request_by_range(&mut self, request_id: RequestId, batch: SyncBatch, columns: &Vec) { + pub fn add_data_columns_request_by_range( + &mut self, + request_id: RequestId, + batch: SyncBatch, + columns: &Vec, + ) { self.log_with_feature(format_args!( "add data column request by range (request_id: {}, peer_id: {}, range: {:?}, columns: [{}])", request_id, @@ -471,6 +481,18 @@ impl SyncManager { .choose(&mut thread_rng()) } + pub fn random_peer_with_head_slot_filtered(&self, min_head_slot: Slot) -> Option { + let chain_id = self.chain_with_max_peer_count()?; + + self.peers + .iter() + .filter(|(_, status)| { + ChainId::from(*status) == chain_id && status.head_slot >= min_head_slot + }) + .map(|(&peer_id, _)| peer_id) + .choose(&mut thread_rng()) + } + pub fn blobs_by_range_request_finished(&mut self, request_id: RequestId) { self.log_with_feature(format_args!( "request blob sidecars by range finished (request_id: {request_id})", @@ -595,12 +617,36 @@ impl SyncManager { .collect() } + fn chain_peers_with_head_slot_filtered( + &self, + chain_id: &ChainId, + min_head_slot: &Slot, + ) -> Vec { + self.peers + .iter() + .filter(|(_, status)| { + &ChainId::from(*status) == chain_id && &status.head_slot >= min_head_slot + }) + .map(|(&peer_id, _)| peer_id) + .collect() + } + fn chain_peers_shuffled(&self, chain_id: &ChainId) -> Vec { let mut peers = self.chain_peers(chain_id); peers.shuffle(&mut thread_rng()); peers } + fn chain_peers_shuffled_with_head_slot_filtered( + &self, + chain_id: &ChainId, + min_head_slot: &Slot, + ) -> Vec { + let mut peers = self.chain_peers_with_head_slot_filtered(chain_id, min_head_slot); + peers.shuffle(&mut thread_rng()); + peers + } + fn chain_with_max_peer_count(&self) -> Option { self.chains_with_peer_counts() .into_iter() @@ -646,34 +692,47 @@ impl SyncManager { } pub fn get_custodial_peers(&self, column_index: ColumnIndex) -> Vec { - self.network_globals - .custody_peers_for_column(column_index) + self.network_globals().custody_peers_for_column(column_index) } - pub fn get_random_custodial_peer(&self, column_index: ColumnIndex, prioritized_peer: Option) -> Option { - let custodial_peers = self.get_custodial_peers(column_index); + pub fn get_random_custodial_peer( + &self, + column_index: ColumnIndex, + prioritized_peer: Option, + min_head_slot: Slot, + ) -> Option { + let custodial_peers = self + .get_custodial_peers(column_index) + .into_iter() + .filter(|peer_id| { + self.peers + .get(&peer_id) + .map_or(false, |peer| peer.head_slot >= min_head_slot) + }) + .collect_vec(); if let Some(peer) = prioritized_peer { if custodial_peers.contains(&peer) { return prioritized_peer; } - } - - custodial_peers - .choose(&mut thread_rng()) - .cloned() + } + + custodial_peers.choose(&mut thread_rng()).cloned() } pub fn map_peer_custody_columns( &self, custody_columns: &Vec, + min_head_slot: Slot, prioritized_peer: Option, ignore_peer: Option, ) -> HashMap> { let mut peer_columns_mapping = HashMap::new(); for column_index in custody_columns { - let Some(custodial_peer) = self.get_random_custodial_peer(*column_index, prioritized_peer) else { + let Some(custodial_peer) = + self.get_random_custodial_peer(*column_index, prioritized_peer, min_head_slot) + else { // this should return no custody column error, rather than warning // warn!("No custodial peer for column_index: {column_index}"); self.log( @@ -683,7 +742,7 @@ impl SyncManager { continue; }; - // given peer_id to ignore from retry batch to the same peer again, + // given peer_id to ignore from retry batch to the same peer again, // which was not able to response to the request if let Some(peer) = ignore_peer { if peer == custodial_peer { @@ -789,7 +848,6 @@ mod tests { } } - // `SyncBatch.count` is 16 because the test cases use `Minimal`. // `Minimal::SlotsPerEpoch::U64` × `EPOCHS_PER_REQUEST` = 8 × 2 = 16. #[test_case( @@ -834,7 +892,7 @@ mod tests { head_root: H256::default(), head_slot: 8 * 32, }; - + let log = build_log(slog::Level::Debug, false); let network_globals = NetworkGlobals::new_test_globals(vec![], CUSTODY_REQUIREMENT, &log); let mut sync_manager = SyncManager::new(network_globals.into());