Skip to content

Commit

Permalink
Refactor, address comments, more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulksnv committed Sep 28, 2023
1 parent 5b7c98a commit 4477868
Showing 1 changed file with 221 additions and 59 deletions.
280 changes: 221 additions & 59 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,65 +423,107 @@ pub struct PeerDownloadState<B: BlockT> {
/// The range being download [start, end).
range: Range<NumberFor<B>>,

/// The blocks downloaded so far.
downloaded: Vec<BlockData<B>>,
/// State of the blocks downloaded so far.
downloaded: Option<PeerDownloaded<B>>,
}

/// State of the blocks downloaded so far.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct PeerDownloaded<B: BlockT> {
/// The downloaded blocks.
blocks: Vec<BlockData<B>>,

/// Block number of the first block in `blocks`.
start_block: NumberFor<B>,
}

impl<B: BlockT> PeerDownloadState<B> {
fn new(common_number: NumberFor<B>, range: Range<NumberFor<B>>) -> Self {
Self { common_number, range, downloaded: Vec::new() }
Self { common_number, range, downloaded: None }
}

/// Handles the new blocks received from the peer.
/// Returns:
/// Ok(true): If done and no more requests need to be issued.
/// Ok(false): if more requests need to be issued.
/// Err(): On any failures.
fn handle_blocks(
/// Validates the new blocks received from the peer and prepends to the
/// already downloaded blocks. Assumes the blocks are received in descending order
/// (e.g) [50 -100), [25 - 50), etc
fn prepend_blocks(
&mut self,
who: &PeerId,
mut new_blocks: Vec<BlockData<B>>,
request: BlockRequest<B>,
collection: &mut BlockCollection<B>,
) -> Result<bool, BadPeer> {
collection.clear_peer_download(who);

) -> Result<(), BadPeer> {
// Validate the blocks.
let start_block = match validate_blocks::<B>(&new_blocks, who, Some(request)) {
Ok(Some(start_block)) => start_block,
Ok(None) => return Ok(true),
Ok(None) => return Err(BadPeer(*who, rep::BAD_RESPONSE)),
Err(err) => return Err(err),
};

// Add the new blocks to the downloaded list.
let range_len = (self.range.end - self.range.start).saturated_into::<u32>() as usize;
if (self.downloaded.len() + new_blocks.len()) > range_len {
// Validate against the current state.
let remaining = self.range.start..
self.downloaded
.as_ref()
.map_or(self.range.end, |downloaded| downloaded.start_block);
// Start block should be in range.
if !remaining.contains(&start_block) {
return Err(BadPeer(*who, rep::BAD_RESPONSE))
}
// The new blocks should be a suffix of the remaining range.
let remaining_len = (remaining.end - start_block).saturated_into::<usize>();
if new_blocks.len() != remaining_len {
return Err(BadPeer(*who, rep::BAD_RESPONSE))
}
let new_blocks_len = new_blocks.len();
new_blocks.append(&mut self.downloaded);
self.downloaded.append(&mut new_blocks);

if self.range.start == (self.common_number + One::one()) &&
self.downloaded.len() < range_len
{
// The download extends the common ancestor, but the response
// was partial. We want to issue further requests to fetch
// the incomplete part before importing.
trace!(
target: LOG_TARGET,
"Download state: incomplete download: \
common = {}, range = {:?}, new_blocks = {new_blocks_len}, downloaded = {}",
self.common_number, self.range, self.downloaded.len()
);
return Ok(false)
// Prepend the new blocks to the downloaded list.
if let Some(downloaded) = self.downloaded.as_mut() {
new_blocks.append(&mut downloaded.blocks);
downloaded.blocks.append(&mut new_blocks);
downloaded.start_block = start_block;
} else {
self.downloaded = Some(PeerDownloaded { blocks: new_blocks, start_block });
}

// Done, report the accumulated blocks to the collection.
let mut downloaded = Vec::new();
downloaded.append(&mut self.downloaded);
collection.insert(start_block, downloaded, *who);
Ok(true)
Ok(())
}

/// Returns the downloaded blocks if we are done downloading the full range.
/// Returns: Some(_) if we are done downloading, None if more requests need to
/// be issued.
fn get_downloaded_blocks(&mut self) -> Option<(NumberFor<B>, Vec<BlockData<B>>)> {
if let Some(downloaded) = self.downloaded.as_mut() {
// Consider the following scenario where we are downloading from a fork:
// common ancestor between current chain and fork = 915, current best
// queued number = 925.
// 1. Request is sent for blocks [916, 916 + 64)
// 2. Peer sends a partial responds with blocks [926, ..)
// 3. Since BlockCollection works off block numbers(and not hash), if this partial
// response is submitted to the collection, this would be considered to extend the
// current tip. Block 925 on current chain would be erroneously considered the parent
// of 926 on the fork. This would result in the partial response being imported,
// which would fail as parent not found and result in the sync failing/restarted.
//
// Instead, hold on to the partial response in this scenario,
// issue more requests to download the missing [916, 925). And submit
// to the block collection after the full range is downloaded.
let range_len = (self.range.end - self.range.start).saturated_into::<usize>();
if self.range.start == (self.common_number + One::one()) &&
downloaded.blocks.len() < range_len
{
trace!(
target: LOG_TARGET,
"Download state: incomplete download: \
common = {}, range = {:?}, downloaded = {}",
self.common_number, self.range, downloaded.blocks.len()
);
None
} else {
// Done, return the downloaded blocks.
let mut blocks = Vec::new();
blocks.append(&mut downloaded.blocks);
Some((downloaded.start_block, blocks))
}
} else {
None
}
}

/// Returns the next request to be issued based on the download state.
Expand All @@ -491,20 +533,18 @@ impl<B: BlockT> PeerDownloadState<B> {
peer_best_number: NumberFor<B>,
peer_best_hash: &B::Hash,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let downloaded_head = if let Some(number) =
self.downloaded.first().and_then(|b| b.header.as_ref()).map(|h| *h.number())
{
number
let start_block = if let Some(downloaded) = &self.downloaded {
downloaded.start_block
} else {
warn!(
target: LOG_TARGET,
"Download state: unable to get block number of the downloaded blocks: \
common = {}, range = {:?}, downloaded = {}",
self.common_number, self.range, self.downloaded.len()
common = {}, range = {:?}",
self.common_number, self.range
);
return None
};
let range = Range { start: self.range.start, end: downloaded_head };
let range = Range { start: self.range.start, end: start_block };

// The end is not part of the range.
let last = range.end.saturating_sub(One::one());
Expand Down Expand Up @@ -822,20 +862,21 @@ where
if let Some(request) = request {
match &mut peer.state {
PeerSyncState::DownloadingNew(download_state) => {
match download_state.handle_blocks(who, blocks, request, &mut self.blocks) {
Ok(true) => {
// Done with the range, go back to available state.
peer.state = PeerSyncState::Available;
self.ready_blocks()
},
Ok(false) => {
// Not done yet, leave it in DownloadingNew state.
vec![]
},
Err(err) => {
peer.state = PeerSyncState::Available;
return Err(err)
},
self.blocks.clear_peer_download(who);
if let Err(err) = download_state.prepend_blocks(who, blocks, request) {
peer.state = PeerSyncState::Available;
return Err(err)
}
if let Some((start_block, blocks)) = download_state.get_downloaded_blocks()
{
// Done with the range, report the blocks to the collection,
// go back to available state.
peer.state = PeerSyncState::Available;
self.blocks.insert(start_block, blocks, *who);
self.ready_blocks()
} else {
// Not done yet, leave it in DownloadingNew state to start
vec![]
}
},
PeerSyncState::DownloadingGap(_) => {
Expand Down Expand Up @@ -4142,4 +4183,125 @@ mod test {
sync.peer_disconnected(&peers[1]);
assert_eq!(sync.pending_responses.len(), 0);
}

#[test]
fn peer_download_state() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..64).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>();
let peer_id = PeerId::random();

// Full response
{
let mut state = PeerDownloadState::<Block>::new(19, 20..30);
assert!(state.get_downloaded_blocks().is_none());

let request = BlockRequest::<Block> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::BODY,
from: FromBlock::Number(29),
direction: Direction::Descending,
max: Some(10),
};
let resp_blocks = blocks[19..29].to_vec();
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok());
let (start_block, downloaded) = state.get_downloaded_blocks().unwrap();
assert_eq!(start_block, 20);
assert_eq!(downloaded.len(), 10);
}

// Partial response, not starting after common ancestor
{
let mut state = PeerDownloadState::<Block>::new(19, 25..30);
assert!(state.get_downloaded_blocks().is_none());

let request = BlockRequest::<Block> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::BODY,
from: FromBlock::Number(29),
direction: Direction::Descending,
max: Some(5),
};
let resp_blocks = blocks[28..29].to_vec(); // 1 block.
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok());
let (start_block, downloaded) = state.get_downloaded_blocks().unwrap();
assert_eq!(start_block, 29);
assert_eq!(downloaded.len(), 1);
}

// Partial response, starting after common ancestor
{
let mut state = PeerDownloadState::<Block>::new(19, 20..30);
assert!(state.get_downloaded_blocks().is_none());

// Response 1: Blocks [25 .. 29].
let request = BlockRequest::<Block> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::BODY,
from: FromBlock::Number(29),
direction: Direction::Descending,
max: Some(10),
};
let resp_blocks = blocks[24..29].to_vec();
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok());
assert!(state.get_downloaded_blocks().is_none());

// Response 2: Blocks [20 .. 24].
let (range, request) = state
.peer_block_request(
BlockAttributes::HEADER | BlockAttributes::BODY,
100,
&blocks[0].hash(),
)
.unwrap();
assert_eq!(range.start, 20);
assert_eq!(range.end, 25);
let resp_blocks = blocks[19..24].to_vec();
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_ok());
let (start_block, downloaded) = state.get_downloaded_blocks().unwrap();
assert_eq!(start_block, 20);
assert_eq!(downloaded.len(), 10);
}
}

#[test]
fn peer_download_state_err() {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..64).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>();
let peer_id = PeerId::random();

{
let mut state = PeerDownloadState::<Block>::new(19, 20..30);
assert!(state.get_downloaded_blocks().is_none());

// Start block should be in remaining range.
let request = BlockRequest::<Block> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::BODY,
from: FromBlock::Number(40),
direction: Direction::Descending,
max: Some(1),
};
let resp_blocks = blocks[39..40].to_vec();
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_err());

// Blocks should be suffix of remaining range.
let request = BlockRequest::<Block> {
id: 0,
fields: BlockAttributes::HEADER | BlockAttributes::BODY,
from: FromBlock::Number(28),
direction: Direction::Descending,
max: Some(10),
};
let resp_blocks = blocks[23..28].to_vec();
let response = create_block_response(resp_blocks.clone());
assert!(state.prepend_blocks(&peer_id, response.blocks, request).is_err());
}
}
}

0 comments on commit 4477868

Please sign in to comment.