Skip to content

Commit

Permalink
Merge pull request #177 from yangby-cryptape/bugfix/last-headers-are-…
Browse files Browse the repository at this point in the history
…overridden

fix: let peers cached their own last headers to avoid be overridden
  • Loading branch information
quake authored Jan 9, 2024
2 parents 6182db4 + 773af13 commit 4147b4a
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 23 deletions.
22 changes: 14 additions & 8 deletions src/protocols/light_client/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT};

pub struct Peers {
inner: DashMap<PeerIndex, Peer>,
// verified last N block headers
last_headers: RwLock<Vec<HeaderView>>,
// The headers are fetching, the value is:
fetching_headers: DashMap<Byte32, FetchInfo>,
// The transactions are fetching, the value is:
Expand Down Expand Up @@ -1125,7 +1123,6 @@ impl Peers {

Self {
inner: Default::default(),
last_headers: Default::default(),
fetching_headers: DashMap::new(),
fetching_txs: DashMap::new(),
matched_blocks: Default::default(),
Expand Down Expand Up @@ -1158,10 +1155,6 @@ impl Peers {
(number.saturating_sub(1) / self.check_point_interval) as u32
}

pub(crate) fn last_headers(&self) -> &RwLock<Vec<HeaderView>> {
&self.last_headers
}

#[cfg(test)]
pub(crate) fn fetching_headers(&self) -> &DashMap<Byte32, FetchInfo> {
&self.fetching_headers
Expand Down Expand Up @@ -1366,7 +1359,6 @@ impl Peers {
index: PeerIndex,
state: ProveState,
) -> Result<(), Status> {
*self.last_headers.write().expect("poisoned") = state.get_last_headers().to_vec();
if let Some(mut peer) = self.inner.get_mut(&index) {
let has_reorg = !state.reorg_last_headers.is_empty();
peer.state = peer.state.take().receive_last_state_proof(state)?;
Expand Down Expand Up @@ -1942,6 +1934,20 @@ impl Peers {
})
}

pub(crate) fn find_header_in_proved_state(&self, hash: &Byte32) -> Option<HeaderView> {
self.inner.iter().find_map(|item| {
let (_, peer) = item.pair();
peer.state.get_prove_state().and_then(|prove_state| {
// TODO Store last headers in an ordered hashmap could increase performance.
prove_state
.get_last_headers()
.iter()
.find(|header| hash == &header.hash())
.cloned()
})
})
}

pub(crate) fn get_best_proved_peers(&self, best_tip: &packed::Header) -> Vec<PeerIndex> {
self.get_all_prove_states()
.into_iter()
Expand Down
12 changes: 3 additions & 9 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,15 +1126,9 @@ impl StorageWithChainData {

impl HeaderProvider for StorageWithChainData {
fn get_header(&self, hash: &packed::Byte32) -> Option<HeaderView> {
self.storage.get_header(hash).or_else(|| {
self.peers
.last_headers()
.read()
.expect("poisoned")
.iter()
.find(|header| header.hash().eq(hash))
.cloned()
})
self.storage
.get_header(hash)
.or_else(|| self.peers.find_header_in_proved_state(hash))
}
}

Expand Down
184 changes: 184 additions & 0 deletions src/tests/protocols/light_client/send_last_state_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2023,3 +2023,187 @@ async fn test_with_reorg_blocks(param: ReorgTestParameter) {
);
}
}

// Multi peers are in same chain but have different last states.
// And the higher one send proof to client before the lower one.
#[tokio::test(flavor = "multi_thread")]
async fn multi_peers_override_last_headers() {
let chain = MockChain::new_with_dummy_pow("test-light-client").start();
let nc = MockNetworkContext::new(SupportProtocols::LightClient);

let peer_index_high = PeerIndex::new(1);
let peer_index_low = PeerIndex::new(2);
let peers = {
let peers = chain.create_peers();
peers.add_peer(peer_index_high);
peers.add_peer(peer_index_low);
peers.request_last_state(peer_index_high).unwrap();
peers.request_last_state(peer_index_low).unwrap();
peers
};
let mut protocol = chain.create_light_client_protocol(peers);
protocol.set_last_n_blocks(5);

let num = 30;
chain.mine_to(num);
let num_high = num;
let num_low = num - 5;

let snapshot = chain.shared().snapshot();

let sampled_numbers = vec![3, 7, 11, 15];
let boundary_number_high = num_high - protocol.last_n_blocks() + 1;
let boundary_number_low = num_low - protocol.last_n_blocks() + 1;

// Header only in the higher chain.
let header_hash_for_test = snapshot
.get_header_by_number((num_high + num_low) / 2)
.expect("block stored")
.hash();

// Setup the test fixture.
for (peer_index, num, boundary_number) in [
(peer_index_high, num_high, boundary_number_high),
(peer_index_low, num_low, boundary_number_low),
] {
let prove_request = chain.build_prove_request(
0,
num,
&sampled_numbers,
boundary_number,
protocol.last_n_blocks(),
);
let last_state = LastState::new(prove_request.get_last_header().to_owned());

protocol
.peers()
.update_last_state(peer_index, last_state)
.unwrap();
protocol
.peers()
.update_prove_request(peer_index, prove_request)
.unwrap();
}

// Received proof from higher chain.
{
let (peer_index, num, boundary_number) = (peer_index_high, num_high, boundary_number_high);

let last_header = snapshot
.get_verifiable_header_by_number(num)
.expect("block stored");
let data = {
let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks());
let headers = sampled_numbers
.iter()
.map(|n| *n as BlockNumber)
.filter(|n| *n < first_last_n_number)
.chain((first_last_n_number..num).into_iter())
.map(|n| {
snapshot
.get_verifiable_header_by_number(n)
.expect("block stored")
})
.collect::<Vec<_>>();
let proof = {
let last_number: BlockNumber = last_header.header().raw().number().unpack();
let numbers = headers
.iter()
.map(|header| header.header().raw().number().unpack())
.collect::<Vec<BlockNumber>>();
chain.build_proof_by_numbers(last_number, &numbers)
};
let content = packed::SendLastStateProof::new_builder()
.last_header(last_header.clone())
.proof(proof)
.headers(headers.pack())
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
}
.as_bytes();

protocol.received(nc.context(), peer_index, data).await;

assert!(nc.not_banned(peer_index));

let prove_state = protocol
.get_peer_state(&peer_index)
.expect("has peer state")
.get_prove_state()
.expect("has prove state")
.to_owned();
let last_header: VerifiableHeader = last_header.into();
assert!(prove_state.is_same_as(&last_header));
}

// Run the test: check last headers which is stored in memory.
{
assert!(protocol
.peers()
.find_header_in_proved_state(&header_hash_for_test)
.is_some());
}

// Received proof from lower chain.
{
let (peer_index, num, boundary_number) = (peer_index_low, num_low, boundary_number_low);

let last_header = snapshot
.get_verifiable_header_by_number(num)
.expect("block stored");
let data = {
let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks());
let headers = sampled_numbers
.iter()
.map(|n| *n as BlockNumber)
.filter(|n| *n < first_last_n_number)
.chain((first_last_n_number..num).into_iter())
.map(|n| {
snapshot
.get_verifiable_header_by_number(n)
.expect("block stored")
})
.collect::<Vec<_>>();
let proof = {
let last_number: BlockNumber = last_header.header().raw().number().unpack();
let numbers = headers
.iter()
.map(|header| header.header().raw().number().unpack())
.collect::<Vec<BlockNumber>>();
chain.build_proof_by_numbers(last_number, &numbers)
};
let content = packed::SendLastStateProof::new_builder()
.last_header(last_header.clone())
.proof(proof)
.headers(headers.pack())
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
}
.as_bytes();

protocol.received(nc.context(), peer_index, data).await;

assert!(nc.not_banned(peer_index));

let prove_state = protocol
.get_peer_state(&peer_index)
.expect("has peer state")
.get_prove_state()
.expect("has prove state")
.to_owned();
let last_header: VerifiableHeader = last_header.into();
assert!(prove_state.is_same_as(&last_header));
}

// Run the test: check last headers which is stored in memory, again.
{
assert!(protocol
.peers()
.find_header_in_proved_state(&header_hash_for_test)
.is_some());
}
}
30 changes: 24 additions & 6 deletions src/tests/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, RwLock};

use ckb_chain_spec::consensus::Consensus;
use ckb_network::PeerIndex;
use ckb_types::{
bytes::Bytes,
core::{
Expand All @@ -10,11 +11,12 @@ use ckb_types::{
h256,
packed::{CellInput, CellOutputBuilder, Header, OutPoint, Script, ScriptBuilder},
prelude::*,
utilities::merkle_mountain_range::VerifiableHeader,
H256, U256,
};

use crate::{
protocols::{FetchInfo, PendingTxs},
protocols::{FetchInfo, LastState, PendingTxs, ProveRequest, ProveState},
service::{
BlockFilterRpc, BlockFilterRpcImpl, ChainRpc, ChainRpcImpl, FetchStatus, Order,
ScriptStatus, ScriptType, SearchKey, SearchKeyFilter, SetScriptsCommand, Status,
Expand Down Expand Up @@ -733,11 +735,27 @@ fn rpc() {

// insert fetched headers
let peers = create_peers();
peers
.last_headers()
.write()
.unwrap()
.push(extra_header.clone());
{
let peer_index = PeerIndex::new(3);
peers.add_peer(peer_index);
let tip_header = VerifiableHeader::new(
storage.get_tip_header().into_view(),
Default::default(),
None,
Default::default(),
);
let last_state = LastState::new(tip_header);
let request = ProveRequest::new(last_state.clone(), Default::default());
let prove_state = ProveState::new_from_request(
request.clone(),
Default::default(),
vec![extra_header.clone()],
);
peers.request_last_state(peer_index).unwrap();
peers.update_last_state(peer_index, last_state).unwrap();
peers.update_prove_request(peer_index, request).unwrap();
peers.update_prove_state(peer_index, prove_state).unwrap();
}
peers.fetching_headers().insert(
h256!("0xaa22").pack(),
FetchInfo::new(1111, 3344, false, false),
Expand Down

0 comments on commit 4147b4a

Please sign in to comment.