Skip to content

Commit

Permalink
fix: fix the status marking problem of header sync
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jul 22, 2021
1 parent 2812717 commit 630a69b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 32 deletions.
21 changes: 11 additions & 10 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@ impl<'a> HeadersProcess<'a> {

if headers.is_empty() {
debug!("HeadersProcess is_empty (synchronized)");
let ibd = self.active_chain.is_initial_block_download();
if !ibd {
if let Some(ref mut peer_state) =
self.synchronizer.peers().state.get_mut(&self.peer)
{
peer_state.stop_headers_sync();
}
if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
self.synchronizer
.shared()
.state()
.already_sync(state.value_mut());
}
return Status::ok();
}
Expand Down Expand Up @@ -180,16 +178,19 @@ impl<'a> HeadersProcess<'a> {
let start = headers.last().expect("empty checked");
self.active_chain
.send_getheaders_to_peer(self.nc, self.peer, start);
} else if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
self.synchronizer
.shared()
.state()
.already_sync(state.value_mut());
}

// If we're in IBD, we want outbound peers that will serve us a useful
// chain. Disconnect peers that are on chains with insufficient work.
let peer_flags = self
.synchronizer
.peers()
.state
.get(&self.peer)
.map(|state| state.peer_flags)
.get_flag(self.peer)
.unwrap_or_default();
if self.active_chain.is_initial_block_download()
&& headers.len() != MAX_HEADERS_LEN
Expand Down
16 changes: 7 additions & 9 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
pub const NO_PEER_CHECK_TOKEN: u64 = 255;

const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200);
const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);

Expand Down Expand Up @@ -503,13 +503,11 @@ impl Synchronizer {
}
{
if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
if !peer_state.sync_started() {
peer_state.start_sync(HeadersSyncController::from_header(&tip));
self.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::Release);
}
peer_state.start_sync(HeadersSyncController::from_header(&tip));
self.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::Release);
}
}

Expand Down Expand Up @@ -544,7 +542,7 @@ impl Synchronizer {
|| state.peer_flags.is_whitelist
|| state.peer_flags.is_protect
}
IBDState::Out => state.sync_started(),
IBDState::Out => state.header_synced(),
}
})
.map(|kv_pair| *kv_pair.key())
Expand Down
44 changes: 32 additions & 12 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl ChainSyncState {
HeadersSyncState::Initialized => false,
HeadersSyncState::SyncProtocolConnected => true,
HeadersSyncState::Started => false,
HeadersSyncState::Suspend(until) => until < now,
HeadersSyncState::Suspend(until) | HeadersSyncState::AlreadySync(until) => until < now,
}
}

Expand All @@ -100,9 +100,22 @@ impl ChainSyncState {
self.headers_sync_state = HeadersSyncState::Suspend(until)
}

fn already_sync(&mut self) {
let now = unix_time_as_millis();
// 28 seconds
self.headers_sync_state = HeadersSyncState::AlreadySync(now + 28000);
}

fn started(&self) -> bool {
matches!(self.headers_sync_state, HeadersSyncState::Started)
}

fn synced(&self) -> bool {
matches!(
self.headers_sync_state,
HeadersSyncState::Started | HeadersSyncState::AlreadySync(_)
)
}
}

#[derive(Clone, Debug)]
Expand All @@ -111,6 +124,7 @@ enum HeadersSyncState {
SyncProtocolConnected,
Started,
Suspend(u64), // suspend headers sync until this timestamp (milliseconds since unix epoch)
AlreadySync(u64), // already synced to the end, not as the sync target for the time being, until the pause time is exceeded
}

impl Default for HeadersSyncState {
Expand Down Expand Up @@ -287,22 +301,27 @@ impl PeerState {
self.headers_sync_controller = Some(headers_sync_controller);
}

pub fn suspend_sync(&mut self, suspend_time: u64) {
fn suspend_sync(&mut self, suspend_time: u64) {
let now = unix_time_as_millis();
self.chain_sync.suspend(now + suspend_time);
self.stop_headers_sync();
self.headers_sync_controller = None;
}

fn already_sync(&mut self) {
self.chain_sync.already_sync();
self.headers_sync_controller = None;
}

pub(crate) fn sync_started(&self) -> bool {
self.chain_sync.started()
}

pub(crate) fn sync_connected(&mut self) {
self.chain_sync.connected()
pub(crate) fn header_synced(&self) -> bool {
self.chain_sync.synced()
}

pub(crate) fn stop_headers_sync(&mut self) {
self.headers_sync_controller = None;
pub(crate) fn sync_connected(&mut self) {
self.chain_sync.connected()
}
}

Expand Down Expand Up @@ -1611,11 +1630,12 @@ impl SyncState {

pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
peer_state.suspend_sync(SUSPEND_SYNC_TIME);
assert_ne!(
self.n_sync_started().fetch_sub(1, Ordering::Release),
0,
"n_sync_started overflow when suspend_sync"
);
self.n_sync_started().fetch_sub(1, Ordering::Release);
}

pub(crate) fn already_sync(&self, peer_state: &mut PeerState) {
peer_state.already_sync();
self.n_sync_started().fetch_sub(1, Ordering::Release);
}

pub fn mark_as_known_tx(&self, hash: Byte32) {
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CellBeingCellDepThenSpentInSameBlockTestSubmitBlock),
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplate),
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplateMultiple),
Box::new(HeaderSyncCycle),
// Test hard fork features
Box::new(CheckAbsoluteEpochSince),
Box::new(CheckRelativeEpochSince),
Expand Down
44 changes: 43 additions & 1 deletion test/src/specs/sync/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ckb_logger::info;
use ckb_network::{bytes::Bytes, extract_peer_id, SupportProtocols};
use ckb_types::{
core::BlockView,
packed::{self, Byte32, SyncMessage},
packed::{self, Byte32, SendHeaders, SyncMessage},
prelude::*,
};
use std::time::Duration;
Expand Down Expand Up @@ -460,6 +460,48 @@ impl Spec for SyncTooNewBlock {
}
}

pub struct HeaderSyncCycle;

impl Spec for HeaderSyncCycle {
crate::setup!(num_nodes: 1);

fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
out_ibd_mode(nodes);

let mut net = Net::new(self.name(), node0.consensus(), vec![SupportProtocols::Sync]);
net.connect(node0);

let send_headers = SendHeaders::new_builder()
.headers(Vec::new().pack())
.build();

let msg = SyncMessage::new_builder()
.set(send_headers)
.build()
.as_bytes();

let ret = net.should_receive(node0, |data: &Bytes| {
SyncMessage::from_slice(&data)
.map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_)))
.unwrap_or(false)
});
assert!(ret, "Test node should receive Getheader message from node");

net.send(node0, SupportProtocols::Sync, msg);

let ret = net.should_receive(node0, |data: &Bytes| {
SyncMessage::from_slice(&data)
.map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_)))
.unwrap_or(false)
});
assert!(
ret,
"Test node should receive Getheader message from node twice"
);
}
}

fn build_forks(node: &Node, offsets: &[u64]) -> Vec<BlockView> {
let rpc_client = node.rpc_client();
let mut blocks = Vec::with_capacity(offsets.len());
Expand Down

0 comments on commit 630a69b

Please sign in to comment.