Skip to content

Commit

Permalink
Fixed block response limit check (paritytech#9692)
Browse files Browse the repository at this point in the history
* Fixed block response limit check

* Fixed start block detection and added a test

* Missing test
  • Loading branch information
arkpar authored and Neopallium committed Jan 10, 2022
1 parent 7c880ee commit 9b0c551
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 12 deletions.
4 changes: 2 additions & 2 deletions client/network/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1024 * 1024,
max_response_size: 16 * 1024 * 1024,
request_timeout: Duration::from_secs(40),
request_timeout: Duration::from_secs(20),
inbound_queue: None,
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
is_empty_justification,
};

total_size += block_data.body.len();
total_size += block_data.body.iter().map(|ex| ex.len()).sum::<usize>();
blocks.push(block_data);

if blocks.len() >= max_blocks as usize || total_size > MAX_BODY_BYTES {
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
} else {
None
},
receipt: if !block_data.message_queue.is_empty() {
receipt: if !block_data.receipt.is_empty() {
Some(block_data.receipt)
} else {
None
Expand Down
19 changes: 11 additions & 8 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod blocks;
mod extra_requests;

/// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
const MAX_BLOCKS_TO_REQUEST: usize = 64;

/// Maximum blocks to store in the import queue.
const MAX_IMPORTING_BLOCKS: usize = 2048;
Expand Down Expand Up @@ -813,12 +813,14 @@ impl<B: BlockT> ChainSync<B> {
self.pending_requests.add(who);
if let Some(request) = request {
match &mut peer.state {
PeerSyncState::DownloadingNew(start_block) => {
PeerSyncState::DownloadingNew(_) => {
self.blocks.clear_peer_download(who);
let start_block = *start_block;
peer.state = PeerSyncState::Available;
validate_blocks::<B>(&blocks, who, Some(request))?;
self.blocks.insert(start_block, blocks, who.clone());
if let Some(start_block) =
validate_blocks::<B>(&blocks, who, Some(request))?
{
self.blocks.insert(start_block, blocks, who.clone());
}
self.blocks
.drain(self.best_queued_number + One::one())
.into_iter()
Expand Down Expand Up @@ -1790,13 +1792,14 @@ fn is_descendent_of<Block, T>(client: &T, base: &Block::Hash, block: &Block::Has
}

/// Validate that the given `blocks` are correct.
/// Returns the number of the first block in the sequence.
///
/// It is expected that `blocks` are in asending order.
/// It is expected that `blocks` are in ascending order.
fn validate_blocks<Block: BlockT>(
blocks: &Vec<message::BlockData<Block>>,
who: &PeerId,
request: Option<BlockRequest<Block>>,
) -> Result<(), BadPeer> {
) -> Result<Option<NumberFor<Block>>, BadPeer> {
if let Some(request) = request {
if Some(blocks.len() as _) > request.max {
debug!(
Expand Down Expand Up @@ -1889,7 +1892,7 @@ fn validate_blocks<Block: BlockT>(
}
}

Ok(())
Ok(blocks.first().and_then(|b| b.header.as_ref()).map(|h| *h.number()))
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<B: BlockT> BlockCollection<B> {
for r in ranges {
self.blocks.remove(&r);
}
trace!(target: "sync", "Drained {} blocks", drained.len());
trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from);
drained
}

Expand Down
231 changes: 231 additions & 0 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,3 +935,234 @@ fn continue_to_sync_after_some_block_announcement_verifications_failed() {
net.block_until_sync();
assert!(net.peer(1).has_block(&block_hash));
}

/// When being spammed by the same request of a peer, we ban this peer. However, we should only ban
/// this peer if the request was successful. In the case of a justification request for example,
/// we ask our peers multiple times until we got the requested justification. This test ensures that
/// asking for the same justification multiple times doesn't ban a peer.
#[test]
fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() {
sp_tracing::try_init_simple();
let mut net = JustificationTestNet::new(2);
net.peer(0).push_blocks(10, false);
net.block_until_sync();

// there's currently no justification for block #10
assert_eq!(net.peer(0).client().justifications(&BlockId::Number(10)).unwrap(), None);
assert_eq!(net.peer(1).client().justifications(&BlockId::Number(10)).unwrap(), None);

let h1 = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap();

// Let's assume block 10 was finalized, but we still need the justification from the network.
net.peer(1).request_justification(&h1.hash().into(), 10);

// Let's build some more blocks and wait always for the network to have synced them
for _ in 0..5 {
// We need to sleep 10 seconds as this is the time we wait between sending a new
// justification request.
std::thread::sleep(std::time::Duration::from_secs(10));
net.peer(0).push_blocks(1, false);
net.block_until_sync();
assert_eq!(1, net.peer(0).num_peers());
}

// Finalize the block and make the justification available.
net.peer(0)
.client()
.finalize_block(BlockId::Number(10), Some((*b"FRNK", Vec::new())), true)
.unwrap();

block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);

if net.peer(1).client().justifications(&BlockId::Number(10)).unwrap() !=
Some(Justifications::from((*b"FRNK", Vec::new())))
{
return Poll::Pending
}

Poll::Ready(())
}));
}

#[test]
fn syncs_all_forks_from_single_peer() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(10, false);
net.peer(1).push_blocks(10, false);

// poll until the two nodes connect, otherwise announcing the block will not work
net.block_until_connected();

// Peer 0 produces new blocks and announces.
let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true);

// Wait till peer 1 starts downloading
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).network().best_seen_block() != Some(12) {
return Poll::Pending
}
Poll::Ready(())
}));

// Peer 0 produces and announces another fork
let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false);

net.block_until_sync();

// Peer 1 should have both branches,
assert!(net.peer(1).client().header(&BlockId::Hash(branch1)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(branch2)).unwrap().is_some());
}

#[test]
fn syncs_after_missing_announcement() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(0);
net.add_full_peer_with_config(Default::default());
// Set peer 1 to ignore announcement
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(FailingBlockAnnounceValidator(11))),
..Default::default()
});
net.peer(0).push_blocks(10, false);
net.peer(1).push_blocks(10, false);

net.block_until_connected();

// Peer 0 produces a new block and announces. Peer 1 ignores announcement.
net.peer(0).push_blocks_at(BlockId::Number(10), 1, false);
// Peer 0 produces another block and announces.
let final_block = net.peer(0).push_blocks_at(BlockId::Number(11), 1, false);
net.peer(1).push_blocks_at(BlockId::Number(10), 1, true);
net.block_until_sync();
assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some());
}

#[test]
fn syncs_state() {
sp_tracing::try_init_simple();
for skip_proofs in &[false, true] {
let mut net = TestNet::new(0);
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(FullPeerConfig {
sync_mode: SyncMode::Fast { skip_proofs: *skip_proofs, storage_chain_mode: false },
..Default::default()
});
net.peer(0).push_blocks(64, false);
// Wait for peer 1 to sync header chain.
net.block_until_sync();
assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64)));

let just = (*b"FRNK", Vec::new());
net.peer(1)
.client()
.finalize_block(BlockId::Number(60), Some(just), true)
.unwrap();
// Wait for state sync.
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client.info().finalized_state.is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
}));
assert!(!net.peer(1).client().has_state_at(&BlockId::Number(64)));
// Wait for the rest of the states to be imported.
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().has_state_at(&BlockId::Number(64)) {
Poll::Ready(())
} else {
Poll::Pending
}
}));
}
}

#[test]
fn syncs_indexed_blocks() {
use sp_runtime::traits::Hash;
sp_tracing::try_init_simple();
let mut net = TestNet::new(0);
let mut n: u64 = 0;
net.add_full_peer_with_config(FullPeerConfig { storage_chain: true, ..Default::default() });
net.add_full_peer_with_config(FullPeerConfig {
storage_chain: true,
sync_mode: SyncMode::Fast { skip_proofs: false, storage_chain_mode: true },
..Default::default()
});
net.peer(0).generate_blocks_at(
BlockId::number(0),
64,
BlockOrigin::Own,
|mut builder| {
let ex = Extrinsic::Store(n.to_le_bytes().to_vec());
n += 1;
builder.push(ex).unwrap();
builder.build().unwrap().block
},
false,
true,
true,
);
let indexed_key = sp_runtime::traits::BlakeTwo256::hash(&42u64.to_le_bytes());
assert!(net
.peer(0)
.client()
.as_full()
.unwrap()
.indexed_transaction(&indexed_key)
.unwrap()
.is_some());
assert!(net
.peer(1)
.client()
.as_full()
.unwrap()
.indexed_transaction(&indexed_key)
.unwrap()
.is_none());

net.block_until_sync();
assert!(net
.peer(1)
.client()
.as_full()
.unwrap()
.indexed_transaction(&indexed_key)
.unwrap()
.is_some());
}

#[test]
fn syncs_huge_blocks() {
use sp_core::storage::well_known_keys::HEAP_PAGES;
use sp_runtime::codec::Encode;
use substrate_test_runtime_client::BlockBuilderExt;

sp_tracing::try_init_simple();
let mut net = TestNet::new(2);

// Increase heap space for bigger blocks.
net.peer(0).generate_blocks(1, BlockOrigin::Own, |mut builder| {
builder.push_storage_change(HEAP_PAGES.to_vec(), Some(256u64.encode())).unwrap();
builder.build().unwrap().block
});

net.peer(0).generate_blocks(32, BlockOrigin::Own, |mut builder| {
// Add 32 extrinsics 32k each = 1MiB total
for _ in 0..32 {
let ex = Extrinsic::IncludeData([42u8; 32 * 1024].to_vec());
builder.push(ex).unwrap();
}
builder.build().unwrap().block
});

net.block_until_sync();
assert_eq!(net.peer(0).client.info().best_number, 33);
assert_eq!(net.peer(1).client.info().best_number, 33);
}

0 comments on commit 9b0c551

Please sign in to comment.