Skip to content

Commit

Permalink
fix: by range request to peers with less head slot
Browse files Browse the repository at this point in the history
  • Loading branch information
hangleang committed Oct 20, 2024
1 parent 0ee5655 commit a5415cd
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 84 deletions.
3 changes: 1 addition & 2 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,10 @@ where
{
let blob_count = post_deneb_block_body.blob_kzg_commitments().len();

// check if it is supernode, and maintaining columns more than half
// check if it is supernode, and obtaining columns more than half
if self.store.is_supernode()
&& available_columns.len() > NumberOfColumns::USIZE / 2
{
// TODO(feature/das): reconstruct the missing columns here
if !self
.store
.has_reconstructed_data_column_sidecars(block_root)
Expand Down
70 changes: 44 additions & 26 deletions p2p/src/block_sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,16 @@ impl<P: Preset> BlockSyncService<P> {
}

pub fn retry_sync_batches(&mut self, batches: Vec<SyncBatch>) -> Result<()> {
let new_batches = batches.into_iter()
let new_batches = batches
.into_iter()
.filter_map(|b| {
match b.target {
SyncTarget::DataColumnSidecar(columns) =>
Some(self.sync_manager
.map_peer_custody_columns(&columns, None, Some(b.peer_id))
// TODO(feature/das): we should reconstruct the batch by:
// - [ ] filter out the columns that are already received or accepted,
// - [x] filter out peer that are their head slot is less than start slot
SyncTarget::DataColumnSidecar(columns) => Some(
self.sync_manager
.map_peer_custody_columns(&columns, b.start_slot, None, Some(b.peer_id))
.into_iter()
.map(|(peer_id, peer_columns)| SyncBatch {
target: SyncTarget::DataColumnSidecar(peer_columns),
Expand All @@ -408,18 +412,20 @@ impl<P: Preset> BlockSyncService<P> {
start_slot: b.start_slot,
count: b.count,
})
.collect_vec()
),
SyncTarget::Block | SyncTarget::BlobSidecar =>
self.sync_manager
.random_peer()
.map(|peer_id| vec![SyncBatch {
.collect_vec(),
),
SyncTarget::Block | SyncTarget::BlobSidecar => self
.sync_manager
.random_peer_with_head_slot_filtered(b.start_slot)
.map(|peer_id| {
vec![SyncBatch {
target: b.target,
direction: b.direction,
peer_id,
start_slot: b.start_slot,
count: b.count,
}])
}]
}),
}
})
.flatten()
Expand Down Expand Up @@ -558,8 +564,14 @@ impl<P: Preset> BlockSyncService<P> {
self.sync_manager
.add_data_columns_request_by_range(request_id, batch, &columns);

SyncToP2p::RequestDataColumnsByRange(request_id, peer_id, start_slot, count, columns.to_vec())
.send(&self.sync_to_p2p_tx);
SyncToP2p::RequestDataColumnsByRange(
request_id,
peer_id,
start_slot,
count,
columns.to_vec(),
)
.send(&self.sync_to_p2p_tx);
}
SyncTarget::BlobSidecar => {
self.sync_manager
Expand Down Expand Up @@ -599,24 +611,28 @@ impl<P: Preset> BlockSyncService<P> {
);
return Ok(());
}

let first_id = identifiers.first().expect("must request at least 1 data column sidecar");

let first_id = identifiers
.first()
.expect("must request at least 1 data column sidecar");
let columns_indices = identifiers.iter().map(|id| id.index).collect();
let peer_custody_columns_mapping = self.sync_manager.map_peer_custody_columns(&columns_indices, peer_id, None);
let peer_custody_columns_mapping =
self.sync_manager
.map_peer_custody_columns(&columns_indices, slot, peer_id, None);
// let request_peers = peer_custody_columns_mapping.keys();
// let num_of_requests = peer_custody_columns_mapping.len();

// TODO(feature/das): fetch corresponding peers to the custody columns needed
for (peer_id, columns) in peer_custody_columns_mapping.into_iter() {

if !columns.is_empty() {

let request_id = self.request_id()?;

let peer_custody_columns = columns
.into_iter()
.map(|index| DataColumnIdentifier { index, block_root: first_id.block_root })
.collect::<Vec<_>>();
let request_id = self.request_id()?;

let peer_custody_columns = columns
.into_iter()
.map(|index| DataColumnIdentifier {
index,
block_root: first_id.block_root,
})
.collect::<Vec<_>>();
let data_column_identifiers = self
.sync_manager
.add_data_columns_request_by_root(peer_custody_columns, peer_id);
Expand Down Expand Up @@ -652,7 +668,9 @@ impl<P: Preset> BlockSyncService<P> {

let request_id = self.request_id()?;

let Some(peer_id) = peer_id.or_else(|| self.sync_manager.random_peer()) else {
let Some(peer_id) =
peer_id.or_else(|| self.sync_manager.random_peer_with_head_slot_filtered(slot))
else {
return Ok(());
};

Expand Down
47 changes: 17 additions & 30 deletions p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ impl<P: Preset> Network<P> {
(request_id: {request_id}, peer_id: {peer_id}, slot: {data_column_sidecar_slot}, data_column: {data_column_identifier:?})",
),
);

self.log(
Level::Debug,
format_args!(
Expand Down Expand Up @@ -2207,37 +2207,24 @@ impl<P: Preset> Network<P> {
count: u64,
peer_custody_columns: Vec<ColumnIndex>,
) {
let epoch = misc::compute_epoch_at_slot::<P>(start_slot);

// prevent node from sending excessive requests, since custody peers is not available.
if self.check_good_peers_on_column_subnets(epoch) {
// TODO: is count capped in eth2_libp2p?
let request = DataColumnsByRangeRequest {
start_slot,
count,
columns: Arc::new(
ContiguousList::try_from(peer_custody_columns)
.expect("fail to parse custody_columns"),
),
};
// TODO: is count capped in eth2_libp2p?
let request = DataColumnsByRangeRequest {
start_slot,
count,
columns: Arc::new(
ContiguousList::try_from(peer_custody_columns)
.expect("fail to parse custody_columns"),
),
};

self.log(
Level::Info,
format_args!(
"sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})",
),
);
self.log(
Level::Debug,
format_args!(
"sending DataColumnsByRange request (request_id: {request_id} peer_id: {peer_id}, request: {request:?})",
),
);

self.request(peer_id, request_id, Request::DataColumnsByRange(request));
} else {
self.log(
Level::Info,
format_args!(
"Waiting for peers to be available on custody_columns: [{}]",
peer_custody_columns.iter().join(", "),
),
);
}
self.request(peer_id, request_id, Request::DataColumnsByRange(request));
}

fn request_data_columns_by_root(
Expand Down
Loading

0 comments on commit a5415cd

Please sign in to comment.