Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Warp-only sync with warp-barrier [blocknumber] flag. #8228

Merged
merged 4 commits into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ usage! {
"--no-serve-light",
"Disable serving of light peers.",

ARG arg_warp_barrier: (Option<u64>) = None, or |c: &Config| c.network.as_ref()?.warp_barrier.clone(),
"--warp-barrier=[NUM]",
"When warp enabled never attempt regular sync before warping to block NUM.",

ARG arg_port: (u16) = 30303u16, or |c: &Config| c.network.as_ref()?.port.clone(),
"--port=[PORT]",
"Override the port on which the node should listen.",
Expand Down Expand Up @@ -1044,6 +1048,7 @@ struct Ui {
#[serde(deny_unknown_fields)]
struct Network {
warp: Option<bool>,
warp_barrier: Option<u64>,
port: Option<u16>,
min_peers: Option<u16>,
max_peers: Option<u16>,
Expand Down Expand Up @@ -1625,6 +1630,7 @@ mod tests {
flag_geth: false,
flag_testnet: false,
flag_import_geth_keys: false,
arg_warp_barrier: None,
arg_datadir: None,
arg_networkid: None,
arg_peers: None,
Expand Down Expand Up @@ -1730,6 +1736,7 @@ mod tests {
}),
network: Some(Network {
warp: Some(false),
warp_barrier: None,
port: None,
min_peers: Some(10),
max_peers: Some(20),
Expand Down
2 changes: 2 additions & 0 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ impl Configuration {
wal: wal,
vm_type: vm_type,
warp_sync: warp_sync,
warp_barrier: self.args.arg_warp_barrier,
public_node: public_node,
geth_compatibility: geth_compatibility,
net_settings: self.network_settings()?,
Expand Down Expand Up @@ -1401,6 +1402,7 @@ mod tests {
network_id: None,
public_node: false,
warp_sync: true,
warp_barrier: None,
acc_conf: Default::default(),
gas_pricer_conf: Default::default(),
miner_extras: Default::default(),
Expand Down
9 changes: 7 additions & 2 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct RunCmd {
pub net_conf: ethsync::NetworkConfiguration,
pub network_id: Option<u64>,
pub warp_sync: bool,
pub warp_barrier: Option<u64>,
pub public_node: bool,
pub acc_conf: AccountsConfig,
pub gas_pricer_conf: GasPricerConfig,
Expand Down Expand Up @@ -513,7 +514,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
}

sync_config.fork_block = spec.fork_block();
let mut warp_sync = cmd.warp_sync;
let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync;
if warp_sync {
// Logging is not initialized yet, so we print directly to stderr
if fat_db {
Expand All @@ -527,7 +528,11 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
warp_sync = false;
}
}
sync_config.warp_sync = spec.engine.supports_warp() && warp_sync;
sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) {
(true, Some(block)) => ethsync::WarpSync::OnlyAndAfter(block),
(true, _) => ethsync::WarpSync::Enabled,
_ => ethsync::WarpSync::Disabled,
};
sync_config.download_old_blocks = cmd.download_old_blocks;
sync_config.serve_light = cmd.serve_light;

Expand Down
39 changes: 37 additions & 2 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ pub const ETH_PROTOCOL: ProtocolId = *b"eth";
/// Ethereum light protocol
pub const LIGHT_PROTOCOL: ProtocolId = *b"pip";

/// Determine warp sync status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WarpSync {
/// Warp sync is enabled.
Enabled,
/// Warp sync is disabled.
Disabled,
/// Only warp sync is allowed (no regular sync) and only after given block number.
OnlyAndAfter(BlockNumber),
}

impl WarpSync {
/// Returns true if warp sync is enabled.
pub fn is_enabled(&self) -> bool {
match *self {
WarpSync::Enabled => true,
WarpSync::OnlyAndAfter(_) => true,
WarpSync::Disabled => false,
}
}

/// Returns `true` if we are in warp-only mode.
///
/// i.e. we will never fall back to regular sync
/// until given block number is reached by
/// successfuly finding and restoring from a snapshot.
pub fn is_warp_only(&self) -> bool {
if let WarpSync::OnlyAndAfter(_) = *self {
true
} else {
false
}
}
}

/// Sync configuration
#[derive(Debug, Clone, Copy)]
pub struct SyncConfig {
Expand All @@ -61,7 +96,7 @@ pub struct SyncConfig {
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
/// Enable snapshot sync
pub warp_sync: bool,
pub warp_sync: WarpSync,
/// Enable light client server.
pub serve_light: bool,
}
Expand All @@ -75,7 +110,7 @@ impl Default for SyncConfig {
subprotocol_name: ETH_PROTOCOL,
light_subprotocol_name: LIGHT_PROTOCOL,
fork_block: None,
warp_sync: false,
warp_sync: WarpSync::Disabled,
serve_light: false,
}
}
Expand Down
53 changes: 39 additions & 14 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use ethcore::error::*;
use ethcore::snapshot::{ManifestData, RestorationStatus};
use transaction::PendingTransaction;
use sync_io::SyncIo;
use super::SyncConfig;
use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot, ChunkType};
Expand Down Expand Up @@ -385,7 +385,7 @@ pub struct ChainSync {
/// Enable ancient block downloading
download_old_blocks: bool,
/// Enable warp sync.
enable_warp_sync: bool,
warp_sync: WarpSync,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -394,9 +394,16 @@ impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number;
let state = match config.warp_sync {
WarpSync::Enabled => SyncState::WaitingPeers,
WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
_ => SyncState::Idle,
};

let mut sync = ChainSync {
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number,
state,
starting_block: best_block,
highest_block: None,
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
Expand All @@ -410,7 +417,7 @@ impl ChainSync {
snapshot: Snapshot::new(),
sync_start_time: None,
transactions_stats: TransactionsStats::default(),
enable_warp_sync: config.warp_sync,
warp_sync: config.warp_sync,
};
sync.update_targets(chain);
sync
Expand Down Expand Up @@ -508,10 +515,12 @@ impl ChainSync {
}

fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
if !self.enable_warp_sync || io.snapshot_service().supported_versions().is_none() {
if !self.warp_sync.is_enabled() || io.snapshot_service().supported_versions().is_none() {
trace!(target: "sync", "Skipping warp sync. Disabled or not supported.");
return;
}
if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting {
trace!(target: "sync", "Skipping warp sync. State: {:?}", self.state);
return;
}
// Make sure the snapshot block is not too far away from best block and network best block and
Expand All @@ -520,11 +529,16 @@ impl ChainSync {
let fork_block = self.fork_block.as_ref().map(|&(n, _)| n).unwrap_or(0);

let (best_hash, max_peers, snapshot_peers) = {
let expected_warp_block = match self.warp_sync {
WarpSync::OnlyAndAfter(block) => block,
_ => 0,
};
//collect snapshot infos from peers
let snapshots = self.peers.iter()
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn|
our_best_block < sn && (sn - our_best_block) > SNAPSHOT_RESTORE_THRESHOLD &&
sn > fork_block &&
sn > expected_warp_block &&
self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD)
))
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())))
Expand Down Expand Up @@ -554,7 +568,7 @@ impl ChainSync {
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
}
} else if timeout {
} else if timeout && !self.warp_sync.is_warp_only() {
trace!(target: "sync", "No snapshots found, starting full sync");
self.state = SyncState::Idle;
self.continue_sync(io);
Expand Down Expand Up @@ -626,10 +640,6 @@ impl ChainSync {
block_set: None,
};

if self.sync_start_time.is_none() {
self.sync_start_time = Some(Instant::now());
}

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
if io.is_expired() {
Expand Down Expand Up @@ -658,6 +668,10 @@ impl ChainSync {
return Ok(());
}

if self.sync_start_time.is_none() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved that after the initial checks, otherwise in case we are not using warp-only sync, we might timeout a bit too early (I actually got a lot of peers on different chains from discovery), without even giving a chance to start warping.

self.sync_start_time = Some(Instant::now());
}

self.peers.insert(peer_id.clone(), peer);
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
Expand Down Expand Up @@ -1167,9 +1181,14 @@ impl ChainSync {
self.sync_peer(io, p, false);
}
}
if (self.state != SyncState::WaitingPeers && self.state != SyncState::SnapshotWaiting && self.state != SyncState::Waiting && self.state != SyncState::Idle)
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {

if
self.state != SyncState::WaitingPeers &&
self.state != SyncState::SnapshotWaiting &&
self.state != SyncState::Waiting &&
self.state != SyncState::Idle &&
!self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync())
{
self.complete_sync(io);
}
}
Expand Down Expand Up @@ -1220,7 +1239,13 @@ impl ChainSync {
if force || higher_difficulty || self.old_blocks.is_some() {
match self.state {
SyncState::WaitingPeers => {
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
trace!(
target: "sync",
"Checking snapshot sync: {} vs {} (peer: {})",
peer_snapshot_number,
chain_info.best_block_number,
peer_id
);
self.maybe_start_snapshot_sync(io);
},
SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => {
Expand Down
4 changes: 2 additions & 2 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockId, EachBlockWith, ChainInfo, BlockInfo};
use chain::{SyncState};
use super::helpers::*;
use SyncConfig;
use {SyncConfig, WarpSync};

#[test]
fn two_peers() {
Expand Down Expand Up @@ -161,7 +161,7 @@ fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle);
let mut config = SyncConfig::default();
config.warp_sync = true;
config.warp_sync = WarpSync::Enabled;
let net = TestNet::new_with_config(2, config);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::WaitingPeers);
}
Expand Down
4 changes: 2 additions & 2 deletions sync/src/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus};
use ethcore::header::BlockNumber;
use ethcore::client::{EachBlockWith};
use super::helpers::*;
use SyncConfig;
use {SyncConfig, WarpSync};

pub struct TestSnapshotService {
manifest: Option<ManifestData>,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl SnapshotService for TestSnapshotService {
fn snapshot_sync() {
::env_logger::init().ok();
let mut config = SyncConfig::default();
config.warp_sync = true;
config.warp_sync = WarpSync::Enabled;
let mut net = TestNet::new_with_config(5, config);
let snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000));
for i in 0..4 {
Expand Down
5 changes: 3 additions & 2 deletions util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,11 @@ impl Host {
};
match TcpStream::connect(&address) {
Ok(socket) => {
trace!(target: "network", "Connecting to {:?}", address);
trace!(target: "network", "{}: Connecting to {:?}", id, address);
socket
},
Err(e) => {
debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e);
debug!(target: "network", "{}: Can't connect to address {:?}: {:?}", id, address, e);
return;
}
}
Expand All @@ -613,6 +613,7 @@ impl Host {
let mut sessions = self.sessions.write();

let token = sessions.insert_with_opt(|token| {
trace!(target: "network", "{}: Initiating session {:?}", token, id);
match Session::new(io, socket, token, id, &nonce, &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
Expand Down