diff --git a/src/protocols/light_client/peers.rs b/src/protocols/light_client/peers.rs index 86b3674..1d0ece6 100644 --- a/src/protocols/light_client/peers.rs +++ b/src/protocols/light_client/peers.rs @@ -20,8 +20,6 @@ use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT}; pub struct Peers { inner: DashMap, - // verified last N block headers - last_headers: RwLock>, // The headers are fetching, the value is: fetching_headers: DashMap, // The transactions are fetching, the value is: @@ -1124,7 +1122,6 @@ impl Peers { Self { inner: Default::default(), - last_headers: Default::default(), fetching_headers: DashMap::new(), fetching_txs: DashMap::new(), matched_blocks: Default::default(), @@ -1157,10 +1154,6 @@ impl Peers { (number.saturating_sub(1) / self.check_point_interval) as u32 } - pub(crate) fn last_headers(&self) -> &RwLock> { - &self.last_headers - } - #[cfg(test)] pub(crate) fn fetching_headers(&self) -> &DashMap { &self.fetching_headers @@ -1365,7 +1358,6 @@ impl Peers { index: PeerIndex, state: ProveState, ) -> Result<(), Status> { - *self.last_headers.write().expect("poisoned") = state.get_last_headers().to_vec(); if let Some(mut peer) = self.inner.get_mut(&index) { let has_reorg = !state.reorg_last_headers.is_empty(); peer.state = peer.state.take().receive_last_state_proof(state)?; @@ -1941,6 +1933,20 @@ impl Peers { }) } + pub(crate) fn find_header_in_proved_state(&self, hash: &Byte32) -> Option { + self.inner.iter().find_map(|item| { + let (_, peer) = item.pair(); + peer.state.get_prove_state().and_then(|prove_state| { + // TODO Store last headers in an ordered hashmap could increase performance. + prove_state + .get_last_headers() + .iter() + .find(|header| hash == &header.hash()) + .cloned() + }) + }) + } + pub(crate) fn get_best_proved_peers(&self, best_tip: &packed::Header) -> Vec { self.get_all_prove_states() .into_iter() diff --git a/src/storage.rs b/src/storage.rs index 19c1afb..05c9ead 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1126,15 +1126,9 @@ impl StorageWithChainData { impl HeaderProvider for StorageWithChainData { fn get_header(&self, hash: &packed::Byte32) -> Option { - self.storage.get_header(hash).or_else(|| { - self.peers - .last_headers() - .read() - .expect("poisoned") - .iter() - .find(|header| header.hash().eq(hash)) - .cloned() - }) + self.storage + .get_header(hash) + .or_else(|| self.peers.find_header_in_proved_state(hash)) } } diff --git a/src/tests/protocols/light_client/send_last_state_proof.rs b/src/tests/protocols/light_client/send_last_state_proof.rs index 59a71a6..d3cbca5 100644 --- a/src/tests/protocols/light_client/send_last_state_proof.rs +++ b/src/tests/protocols/light_client/send_last_state_proof.rs @@ -2023,3 +2023,187 @@ async fn test_with_reorg_blocks(param: ReorgTestParameter) { ); } } + +// Multi peers are in same chain but have different last states. +// And the higher one send proof to client before the lower one. +#[tokio::test(flavor = "multi_thread")] +async fn multi_peers_override_last_headers() { + let chain = MockChain::new_with_dummy_pow("test-light-client").start(); + let nc = MockNetworkContext::new(SupportProtocols::LightClient); + + let peer_index_high = PeerIndex::new(1); + let peer_index_low = PeerIndex::new(2); + let peers = { + let peers = chain.create_peers(); + peers.add_peer(peer_index_high); + peers.add_peer(peer_index_low); + peers.request_last_state(peer_index_high).unwrap(); + peers.request_last_state(peer_index_low).unwrap(); + peers + }; + let mut protocol = chain.create_light_client_protocol(peers); + protocol.set_last_n_blocks(5); + + let num = 30; + chain.mine_to(num); + let num_high = num; + let num_low = num - 5; + + let snapshot = chain.shared().snapshot(); + + let sampled_numbers = vec![3, 7, 11, 15]; + let boundary_number_high = num_high - protocol.last_n_blocks() + 1; + let boundary_number_low = num_low - protocol.last_n_blocks() + 1; + + // Header only in the higher chain. + let header_hash_for_test = snapshot + .get_header_by_number((num_high + num_low) / 2) + .expect("block stored") + .hash(); + + // Setup the test fixture. + for (peer_index, num, boundary_number) in [ + (peer_index_high, num_high, boundary_number_high), + (peer_index_low, num_low, boundary_number_low), + ] { + let prove_request = chain.build_prove_request( + 0, + num, + &sampled_numbers, + boundary_number, + protocol.last_n_blocks(), + ); + let last_state = LastState::new(prove_request.get_last_header().to_owned()); + + protocol + .peers() + .update_last_state(peer_index, last_state) + .unwrap(); + protocol + .peers() + .update_prove_request(peer_index, prove_request) + .unwrap(); + } + + // Received proof from higher chain. + { + let (peer_index, num, boundary_number) = (peer_index_high, num_high, boundary_number_high); + + let last_header = snapshot + .get_verifiable_header_by_number(num) + .expect("block stored"); + let data = { + let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks()); + let headers = sampled_numbers + .iter() + .map(|n| *n as BlockNumber) + .filter(|n| *n < first_last_n_number) + .chain((first_last_n_number..num).into_iter()) + .map(|n| { + snapshot + .get_verifiable_header_by_number(n) + .expect("block stored") + }) + .collect::>(); + let proof = { + let last_number: BlockNumber = last_header.header().raw().number().unpack(); + let numbers = headers + .iter() + .map(|header| header.header().raw().number().unpack()) + .collect::>(); + chain.build_proof_by_numbers(last_number, &numbers) + }; + let content = packed::SendLastStateProof::new_builder() + .last_header(last_header.clone()) + .proof(proof) + .headers(headers.pack()) + .build(); + packed::LightClientMessage::new_builder() + .set(content) + .build() + } + .as_bytes(); + + protocol.received(nc.context(), peer_index, data).await; + + assert!(nc.not_banned(peer_index)); + + let prove_state = protocol + .get_peer_state(&peer_index) + .expect("has peer state") + .get_prove_state() + .expect("has prove state") + .to_owned(); + let last_header: VerifiableHeader = last_header.into(); + assert!(prove_state.is_same_as(&last_header)); + } + + // Run the test: check last headers which is stored in memory. + { + assert!(protocol + .peers() + .find_header_in_proved_state(&header_hash_for_test) + .is_some()); + } + + // Received proof from lower chain. + { + let (peer_index, num, boundary_number) = (peer_index_low, num_low, boundary_number_low); + + let last_header = snapshot + .get_verifiable_header_by_number(num) + .expect("block stored"); + let data = { + let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks()); + let headers = sampled_numbers + .iter() + .map(|n| *n as BlockNumber) + .filter(|n| *n < first_last_n_number) + .chain((first_last_n_number..num).into_iter()) + .map(|n| { + snapshot + .get_verifiable_header_by_number(n) + .expect("block stored") + }) + .collect::>(); + let proof = { + let last_number: BlockNumber = last_header.header().raw().number().unpack(); + let numbers = headers + .iter() + .map(|header| header.header().raw().number().unpack()) + .collect::>(); + chain.build_proof_by_numbers(last_number, &numbers) + }; + let content = packed::SendLastStateProof::new_builder() + .last_header(last_header.clone()) + .proof(proof) + .headers(headers.pack()) + .build(); + packed::LightClientMessage::new_builder() + .set(content) + .build() + } + .as_bytes(); + + protocol.received(nc.context(), peer_index, data).await; + + assert!(nc.not_banned(peer_index)); + + let prove_state = protocol + .get_peer_state(&peer_index) + .expect("has peer state") + .get_prove_state() + .expect("has prove state") + .to_owned(); + let last_header: VerifiableHeader = last_header.into(); + assert!(prove_state.is_same_as(&last_header)); + } + + // Run the test: check last headers which is stored in memory, again. + { + assert!(protocol + .peers() + .find_header_in_proved_state(&header_hash_for_test) + .is_some()); + } +} diff --git a/src/tests/service.rs b/src/tests/service.rs index 2217adb..7a2efdb 100644 --- a/src/tests/service.rs +++ b/src/tests/service.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, RwLock}; use ckb_chain_spec::consensus::Consensus; +use ckb_network::PeerIndex; use ckb_types::{ bytes::Bytes, core::{ @@ -10,11 +11,12 @@ use ckb_types::{ h256, packed::{CellInput, CellOutputBuilder, Header, OutPoint, Script, ScriptBuilder}, prelude::*, + utilities::merkle_mountain_range::VerifiableHeader, H256, U256, }; use crate::{ - protocols::{FetchInfo, PendingTxs}, + protocols::{FetchInfo, LastState, PendingTxs, ProveRequest, ProveState}, service::{ BlockFilterRpc, BlockFilterRpcImpl, ChainRpc, ChainRpcImpl, FetchStatus, Order, ScriptStatus, ScriptType, SearchKey, SearchKeyFilter, SetScriptsCommand, Status, @@ -733,11 +735,27 @@ fn rpc() { // insert fetched headers let peers = create_peers(); - peers - .last_headers() - .write() - .unwrap() - .push(extra_header.clone()); + { + let peer_index = PeerIndex::new(3); + peers.add_peer(peer_index); + let tip_header = VerifiableHeader::new( + storage.get_tip_header().into_view(), + Default::default(), + None, + Default::default(), + ); + let last_state = LastState::new(tip_header); + let request = ProveRequest::new(last_state.clone(), Default::default()); + let prove_state = ProveState::new_from_request( + request.clone(), + Default::default(), + vec![extra_header.clone()], + ); + peers.request_last_state(peer_index).unwrap(); + peers.update_last_state(peer_index, last_state).unwrap(); + peers.update_prove_request(peer_index, request).unwrap(); + peers.update_prove_state(peer_index, prove_state).unwrap(); + } peers.fetching_headers().insert( h256!("0xaa22").pack(), FetchInfo::new(1111, 3344, false, false),