Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: let peers cached their own last headers to avoid be overridden #177

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/protocols/light_client/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use crate::protocols::{Status, StatusCode, MESSAGE_TIMEOUT};

pub struct Peers {
inner: DashMap<PeerIndex, Peer>,
// verified last N block headers
last_headers: RwLock<Vec<HeaderView>>,
// The headers are fetching, the value is:
fetching_headers: DashMap<Byte32, FetchInfo>,
// The transactions are fetching, the value is:
Expand Down Expand Up @@ -1124,7 +1122,6 @@ impl Peers {

Self {
inner: Default::default(),
last_headers: Default::default(),
fetching_headers: DashMap::new(),
fetching_txs: DashMap::new(),
matched_blocks: Default::default(),
Expand Down Expand Up @@ -1157,10 +1154,6 @@ impl Peers {
(number.saturating_sub(1) / self.check_point_interval) as u32
}

pub(crate) fn last_headers(&self) -> &RwLock<Vec<HeaderView>> {
&self.last_headers
}

#[cfg(test)]
pub(crate) fn fetching_headers(&self) -> &DashMap<Byte32, FetchInfo> {
&self.fetching_headers
Expand Down Expand Up @@ -1365,7 +1358,6 @@ impl Peers {
index: PeerIndex,
state: ProveState,
) -> Result<(), Status> {
*self.last_headers.write().expect("poisoned") = state.get_last_headers().to_vec();
if let Some(mut peer) = self.inner.get_mut(&index) {
let has_reorg = !state.reorg_last_headers.is_empty();
peer.state = peer.state.take().receive_last_state_proof(state)?;
Expand Down Expand Up @@ -1941,6 +1933,20 @@ impl Peers {
})
}

pub(crate) fn find_header_in_proved_state(&self, hash: &Byte32) -> Option<HeaderView> {
self.inner.iter().find_map(|item| {
let (_, peer) = item.pair();
peer.state.get_prove_state().and_then(|prove_state| {
// TODO Store last headers in an ordered hashmap could increase performance.
prove_state
.get_last_headers()
.iter()
.find(|header| hash == &header.hash())
.cloned()
})
})
}

pub(crate) fn get_best_proved_peers(&self, best_tip: &packed::Header) -> Vec<PeerIndex> {
self.get_all_prove_states()
.into_iter()
Expand Down
12 changes: 3 additions & 9 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,15 +1126,9 @@ impl StorageWithChainData {

impl HeaderProvider for StorageWithChainData {
fn get_header(&self, hash: &packed::Byte32) -> Option<HeaderView> {
self.storage.get_header(hash).or_else(|| {
self.peers
.last_headers()
.read()
.expect("poisoned")
.iter()
.find(|header| header.hash().eq(hash))
.cloned()
})
self.storage
.get_header(hash)
.or_else(|| self.peers.find_header_in_proved_state(hash))
}
}

Expand Down
184 changes: 184 additions & 0 deletions src/tests/protocols/light_client/send_last_state_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2023,3 +2023,187 @@ async fn test_with_reorg_blocks(param: ReorgTestParameter) {
);
}
}

// Multi peers are in same chain but have different last states.
// And the higher one send proof to client before the lower one.
#[tokio::test(flavor = "multi_thread")]
async fn multi_peers_override_last_headers() {
let chain = MockChain::new_with_dummy_pow("test-light-client").start();
let nc = MockNetworkContext::new(SupportProtocols::LightClient);

let peer_index_high = PeerIndex::new(1);
let peer_index_low = PeerIndex::new(2);
let peers = {
let peers = chain.create_peers();
peers.add_peer(peer_index_high);
peers.add_peer(peer_index_low);
peers.request_last_state(peer_index_high).unwrap();
peers.request_last_state(peer_index_low).unwrap();
peers
};
let mut protocol = chain.create_light_client_protocol(peers);
protocol.set_last_n_blocks(5);

let num = 30;
chain.mine_to(num);
let num_high = num;
let num_low = num - 5;

let snapshot = chain.shared().snapshot();

let sampled_numbers = vec![3, 7, 11, 15];
let boundary_number_high = num_high - protocol.last_n_blocks() + 1;
let boundary_number_low = num_low - protocol.last_n_blocks() + 1;

// Header only in the higher chain.
let header_hash_for_test = snapshot
.get_header_by_number((num_high + num_low) / 2)
.expect("block stored")
.hash();

// Setup the test fixture.
for (peer_index, num, boundary_number) in [
(peer_index_high, num_high, boundary_number_high),
(peer_index_low, num_low, boundary_number_low),
] {
let prove_request = chain.build_prove_request(
0,
num,
&sampled_numbers,
boundary_number,
protocol.last_n_blocks(),
);
let last_state = LastState::new(prove_request.get_last_header().to_owned());

protocol
.peers()
.update_last_state(peer_index, last_state)
.unwrap();
protocol
.peers()
.update_prove_request(peer_index, prove_request)
.unwrap();
}

// Received proof from higher chain.
{
let (peer_index, num, boundary_number) = (peer_index_high, num_high, boundary_number_high);

let last_header = snapshot
.get_verifiable_header_by_number(num)
.expect("block stored");
let data = {
let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks());
let headers = sampled_numbers
.iter()
.map(|n| *n as BlockNumber)
.filter(|n| *n < first_last_n_number)
.chain((first_last_n_number..num).into_iter())
.map(|n| {
snapshot
.get_verifiable_header_by_number(n)
.expect("block stored")
})
.collect::<Vec<_>>();
let proof = {
let last_number: BlockNumber = last_header.header().raw().number().unpack();
let numbers = headers
.iter()
.map(|header| header.header().raw().number().unpack())
.collect::<Vec<BlockNumber>>();
chain.build_proof_by_numbers(last_number, &numbers)
};
let content = packed::SendLastStateProof::new_builder()
.last_header(last_header.clone())
.proof(proof)
.headers(headers.pack())
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
}
.as_bytes();

protocol.received(nc.context(), peer_index, data).await;

assert!(nc.not_banned(peer_index));

let prove_state = protocol
.get_peer_state(&peer_index)
.expect("has peer state")
.get_prove_state()
.expect("has prove state")
.to_owned();
let last_header: VerifiableHeader = last_header.into();
assert!(prove_state.is_same_as(&last_header));
}

// Run the test: check last headers which is stored in memory.
{
assert!(protocol
.peers()
.find_header_in_proved_state(&header_hash_for_test)
.is_some());
}

// Received proof from lower chain.
{
let (peer_index, num, boundary_number) = (peer_index_low, num_low, boundary_number_low);

let last_header = snapshot
.get_verifiable_header_by_number(num)
.expect("block stored");
let data = {
let first_last_n_number = cmp::min(boundary_number, num - protocol.last_n_blocks());
let headers = sampled_numbers
.iter()
.map(|n| *n as BlockNumber)
.filter(|n| *n < first_last_n_number)
.chain((first_last_n_number..num).into_iter())
.map(|n| {
snapshot
.get_verifiable_header_by_number(n)
.expect("block stored")
})
.collect::<Vec<_>>();
let proof = {
let last_number: BlockNumber = last_header.header().raw().number().unpack();
let numbers = headers
.iter()
.map(|header| header.header().raw().number().unpack())
.collect::<Vec<BlockNumber>>();
chain.build_proof_by_numbers(last_number, &numbers)
};
let content = packed::SendLastStateProof::new_builder()
.last_header(last_header.clone())
.proof(proof)
.headers(headers.pack())
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
}
.as_bytes();

protocol.received(nc.context(), peer_index, data).await;

assert!(nc.not_banned(peer_index));

let prove_state = protocol
.get_peer_state(&peer_index)
.expect("has peer state")
.get_prove_state()
.expect("has prove state")
.to_owned();
let last_header: VerifiableHeader = last_header.into();
assert!(prove_state.is_same_as(&last_header));
}

// Run the test: check last headers which is stored in memory, again.
{
assert!(protocol
.peers()
.find_header_in_proved_state(&header_hash_for_test)
.is_some());
}
}
30 changes: 24 additions & 6 deletions src/tests/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, RwLock};

use ckb_chain_spec::consensus::Consensus;
use ckb_network::PeerIndex;
use ckb_types::{
bytes::Bytes,
core::{
Expand All @@ -10,11 +11,12 @@ use ckb_types::{
h256,
packed::{CellInput, CellOutputBuilder, Header, OutPoint, Script, ScriptBuilder},
prelude::*,
utilities::merkle_mountain_range::VerifiableHeader,
H256, U256,
};

use crate::{
protocols::{FetchInfo, PendingTxs},
protocols::{FetchInfo, LastState, PendingTxs, ProveRequest, ProveState},
service::{
BlockFilterRpc, BlockFilterRpcImpl, ChainRpc, ChainRpcImpl, FetchStatus, Order,
ScriptStatus, ScriptType, SearchKey, SearchKeyFilter, SetScriptsCommand, Status,
Expand Down Expand Up @@ -733,11 +735,27 @@ fn rpc() {

// insert fetched headers
let peers = create_peers();
peers
.last_headers()
.write()
.unwrap()
.push(extra_header.clone());
{
let peer_index = PeerIndex::new(3);
peers.add_peer(peer_index);
let tip_header = VerifiableHeader::new(
storage.get_tip_header().into_view(),
Default::default(),
None,
Default::default(),
);
let last_state = LastState::new(tip_header);
let request = ProveRequest::new(last_state.clone(), Default::default());
let prove_state = ProveState::new_from_request(
request.clone(),
Default::default(),
vec![extra_header.clone()],
);
peers.request_last_state(peer_index).unwrap();
peers.update_last_state(peer_index, last_state).unwrap();
peers.update_prove_request(peer_index, request).unwrap();
peers.update_prove_state(peer_index, prove_state).unwrap();
}
peers.fetching_headers().insert(
h256!("0xaa22").pack(),
FetchInfo::new(1111, 3344, false, false),
Expand Down
Loading