diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 5ee72117471..3ea02e31b07 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -109,13 +109,11 @@ impl<'a> HeadersProcess<'a> { if headers.is_empty() { debug!("HeadersProcess is_empty (synchronized)"); - let ibd = self.active_chain.is_initial_block_download(); - if !ibd { - if let Some(ref mut peer_state) = - self.synchronizer.peers().state.get_mut(&self.peer) - { - peer_state.stop_headers_sync(); - } + if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) { + self.synchronizer + .shared() + .state() + .already_sync(state.value_mut()); } return Status::ok(); } @@ -180,6 +178,11 @@ impl<'a> HeadersProcess<'a> { let start = headers.last().expect("empty checked"); self.active_chain .send_getheaders_to_peer(self.nc, self.peer, start); + } else if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) { + self.synchronizer + .shared() + .state() + .already_sync(state.value_mut()); } // If we're in IBD, we want outbound peers that will serve us a useful @@ -187,9 +190,7 @@ impl<'a> HeadersProcess<'a> { let peer_flags = self .synchronizer .peers() - .state - .get(&self.peer) - .map(|state| state.peer_flags) + .get_flag(self.peer) .unwrap_or_default(); if self.active_chain.is_initial_block_download() && headers.len() != MAX_HEADERS_LEN diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index c34b424d3d4..498d4251637 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -46,7 +46,7 @@ pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2; pub const TIMEOUT_EVICTION_TOKEN: u64 = 3; pub const NO_PEER_CHECK_TOKEN: u64 = 255; -const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200); +const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1); const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40); const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); @@ -503,13 +503,11 @@ impl Synchronizer { } { if let Some(mut peer_state) = self.peers().state.get_mut(&peer) { - if !peer_state.sync_started() { - peer_state.start_sync(HeadersSyncController::from_header(&tip)); - self.shared() - .state() - .n_sync_started() - .fetch_add(1, Ordering::Release); - } + peer_state.start_sync(HeadersSyncController::from_header(&tip)); + self.shared() + .state() + .n_sync_started() + .fetch_add(1, Ordering::Release); } } @@ -544,7 +542,7 @@ impl Synchronizer { || state.peer_flags.is_whitelist || state.peer_flags.is_protect } - IBDState::Out => state.sync_started(), + IBDState::Out => state.header_synced(), } }) .map(|kv_pair| *kv_pair.key()) diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 2510440a181..4db23a0cc24 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -84,7 +84,7 @@ impl ChainSyncState { HeadersSyncState::Initialized => false, HeadersSyncState::SyncProtocolConnected => true, HeadersSyncState::Started => false, - HeadersSyncState::Suspend(until) => until < now, + HeadersSyncState::Suspend(until) | HeadersSyncState::AlreadySync(until) => until < now, } } @@ -100,9 +100,22 @@ impl ChainSyncState { self.headers_sync_state = HeadersSyncState::Suspend(until) } + fn already_sync(&mut self) { + let now = unix_time_as_millis(); + // 28 seconds + self.headers_sync_state = HeadersSyncState::AlreadySync(now + 28000); + } + fn started(&self) -> bool { matches!(self.headers_sync_state, HeadersSyncState::Started) } + + fn synced(&self) -> bool { + matches!( + self.headers_sync_state, + HeadersSyncState::Started | HeadersSyncState::AlreadySync(_) + ) + } } #[derive(Clone, Debug)] @@ -111,6 +124,7 @@ enum HeadersSyncState { SyncProtocolConnected, Started, Suspend(u64), // suspend headers sync until this timestamp (milliseconds since unix epoch) + AlreadySync(u64), // already synced to the end, not as the sync target for the time being, until the pause time is exceeded } impl Default for HeadersSyncState { @@ -287,22 +301,27 @@ impl PeerState { self.headers_sync_controller = Some(headers_sync_controller); } - pub fn suspend_sync(&mut self, suspend_time: u64) { + fn suspend_sync(&mut self, suspend_time: u64) { let now = unix_time_as_millis(); self.chain_sync.suspend(now + suspend_time); - self.stop_headers_sync(); + self.headers_sync_controller = None; + } + + fn already_sync(&mut self) { + self.chain_sync.already_sync(); + self.headers_sync_controller = None; } pub(crate) fn sync_started(&self) -> bool { self.chain_sync.started() } - pub(crate) fn sync_connected(&mut self) { - self.chain_sync.connected() + pub(crate) fn header_synced(&self) -> bool { + self.chain_sync.synced() } - pub(crate) fn stop_headers_sync(&mut self) { - self.headers_sync_controller = None; + pub(crate) fn sync_connected(&mut self) { + self.chain_sync.connected() } } @@ -1611,11 +1630,12 @@ impl SyncState { pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) { peer_state.suspend_sync(SUSPEND_SYNC_TIME); - assert_ne!( - self.n_sync_started().fetch_sub(1, Ordering::Release), - 0, - "n_sync_started overflow when suspend_sync" - ); + self.n_sync_started().fetch_sub(1, Ordering::Release); + } + + pub(crate) fn already_sync(&self, peer_state: &mut PeerState) { + peer_state.already_sync(); + self.n_sync_started().fetch_sub(1, Ordering::Release); } pub fn mark_as_known_tx(&self, hash: Byte32) { diff --git a/test/src/main.rs b/test/src/main.rs index 0dd6d91171f..c1a3658e063 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -490,6 +490,7 @@ fn all_specs() -> Vec> { Box::new(CellBeingCellDepThenSpentInSameBlockTestSubmitBlock), Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplate), Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplateMultiple), + Box::new(HeaderSyncCycle), // Test hard fork features Box::new(CheckAbsoluteEpochSince), Box::new(CheckRelativeEpochSince), diff --git a/test/src/specs/sync/block_sync.rs b/test/src/specs/sync/block_sync.rs index 12c02254608..1213035e5b4 100644 --- a/test/src/specs/sync/block_sync.rs +++ b/test/src/specs/sync/block_sync.rs @@ -10,7 +10,7 @@ use ckb_logger::info; use ckb_network::{bytes::Bytes, extract_peer_id, SupportProtocols}; use ckb_types::{ core::BlockView, - packed::{self, Byte32, SyncMessage}, + packed::{self, Byte32, SendHeaders, SyncMessage}, prelude::*, }; use std::time::Duration; @@ -460,6 +460,48 @@ impl Spec for SyncTooNewBlock { } } +pub struct HeaderSyncCycle; + +impl Spec for HeaderSyncCycle { + crate::setup!(num_nodes: 1); + + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + out_ibd_mode(nodes); + + let mut net = Net::new(self.name(), node0.consensus(), vec![SupportProtocols::Sync]); + net.connect(node0); + + let send_headers = SendHeaders::new_builder() + .headers(Vec::new().pack()) + .build(); + + let msg = SyncMessage::new_builder() + .set(send_headers) + .build() + .as_bytes(); + + let ret = net.should_receive(node0, |data: &Bytes| { + SyncMessage::from_slice(&data) + .map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_))) + .unwrap_or(false) + }); + assert!(ret, "Test node should receive Getheader message from node"); + + net.send(node0, SupportProtocols::Sync, msg); + + let ret = net.should_receive(node0, |data: &Bytes| { + SyncMessage::from_slice(&data) + .map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_))) + .unwrap_or(false) + }); + assert!( + ret, + "Test node should receive Getheader message from node twice" + ); + } +} + fn build_forks(node: &Node, offsets: &[u64]) -> Vec { let rpc_client = node.rpc_client(); let mut blocks = Vec::with_capacity(offsets.len());