Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow multiple initial sync peers #5890

Merged
merged 6 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::{
};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::listening";
const INITIAL_SYNC_PEER_COUNT: usize = 5;

/// This struct contains the info of the peer, and is used to serialised and deserialised.
#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -117,6 +118,8 @@ impl Listening {
info!(target: LOG_TARGET, "Listening for chain metadata updates");
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced)));
let mut time_since_better_block = None;
let mut initial_sync_counter = 0;
let mut initial_sync_peer_list = Vec::new();
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
loop {
Expand Down Expand Up @@ -177,7 +180,7 @@ impl Listening {
};
log_mdc::extend(mdc.clone());

let sync_mode = determine_sync_mode(
let mut sync_mode = determine_sync_mode(
shared.config.blocks_behind_before_considered_lagging,
&local_metadata,
peer_metadata,
Expand All @@ -203,11 +206,11 @@ impl Listening {
.map(|t| t.elapsed() > shared.config.time_before_considered_lagging)
.unwrap()
{
return StateEvent::FallenBehind(SyncStatus::Lagging {
sync_mode = SyncStatus::Lagging {
local: local.clone(),
network: network.clone(),
sync_peers: sync_peers.clone(),
});
};
}
} else {
// We might have gotten up to date via propagation outside of this state, so reset the timer
Expand All @@ -216,15 +219,56 @@ impl Listening {
}
}

if sync_mode.is_lagging() {
return StateEvent::FallenBehind(sync_mode);
}

if !self.is_synced && sync_mode.is_up_to_date() {
self.is_synced = true;
shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true)));
debug!(target: LOG_TARGET, "Initial sync achieved");
}

// If we have already reached initial sync before, as indicated by the `is_synced` flagged we can
// immediately return fallen behind with the peer that has a higher pow than us
if sync_mode.is_lagging() && self.is_synced {
return StateEvent::FallenBehind(sync_mode);
}
// if we are lagging and not yet reached initial sync, we delay a bit till we get
// INITIAL_SYNC_PEER_COUNT metadata updates from peers to ensure we make a better choice of which
// peer to sync from in the next stages
if let SyncStatus::Lagging {
local,
network,
sync_peers,
} = sync_mode
{
initial_sync_counter += 1;
for peer in sync_peers {
let mut found = false;
// lets search the list list to ensure we only have unique peers in the list with the latest
// up-to-date information
for initial_peer in &mut initial_sync_peer_list {
// we compare the two peers via the comparison operator on syncpeer
if *initial_peer == peer {
found = true;
// if the peer is already in the list, we replace all the information about the peer
// with the newest up-to-date information
*initial_peer = peer.clone();
break;
}
}
if !found {
initial_sync_peer_list.push(peer.clone());
}
}
// We use a list here to ensure that we dont wait for even for INITIAL_SYNC_PEER_COUNT different
// peers
if initial_sync_counter >= INITIAL_SYNC_PEER_COUNT {
// lets return now that we have enough peers to chose from
return StateEvent::FallenBehind(SyncStatus::Lagging {
local,
network,
sync_peers: initial_sync_peer_list,
});
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!(target: LOG_TARGET, "Metadata event subscriber lagged by {} item(s)", n);
Expand Down Expand Up @@ -267,8 +311,10 @@ impl From<BlockSync> for Listening {
}

impl From<DecideNextSync> for Listening {
fn from(_: DecideNextSync) -> Self {
Self { is_synced: false }
fn from(sync: DecideNextSync) -> Self {
Self {
is_synced: sync.is_synced(),
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::cmp::Ordering;

use log::*;

use crate::{
Expand All @@ -40,9 +38,14 @@ const LOG_TARGET: &str = "c::bn::state_machine_service::states::sync_decide";
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DecideNextSync {
sync_peers: Vec<SyncPeer>,
is_synced: bool,
}

impl DecideNextSync {
pub fn is_synced(&self) -> bool {
self.is_synced
}

pub async fn next_event<B: BlockchainBackend + 'static>(&mut self, shared: &BaseNodeStateMachine<B>) -> StateEvent {
use StateEvent::{Continue, FatalError, ProceedToBlockSync, ProceedToHorizonSync};
let local_metadata = match shared.db.get_chain_metadata().await {
Expand Down Expand Up @@ -121,55 +124,9 @@ impl DecideNextSync {

impl From<HeaderSyncState> for DecideNextSync {
fn from(sync: HeaderSyncState) -> Self {
sync.into_sync_peers().into()
}
}

impl From<Vec<SyncPeer>> for DecideNextSync {
fn from(mut sync_peers: Vec<SyncPeer>) -> Self {
sync_peers.sort_by(|a, b| match (a.latency(), b.latency()) {
(None, None) => Ordering::Equal,
// No latency goes to the end
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(Some(la), Some(lb)) => la.cmp(&lb),
});
Self { sync_peers }
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use rand::{rngs::OsRng, seq::SliceRandom};
use tari_common_types::chain_metadata::ChainMetadata;

use super::*;

mod sort_by_latency {
use super::*;
use crate::base_node::chain_metadata_service::PeerChainMetadata;

#[test]
fn it_sorts_by_latency() {
let peers = (0..10)
.map(|i| {
PeerChainMetadata::new(
Default::default(),
ChainMetadata::empty(),
Some(Duration::from_millis(i)),
)
.into()
})
.chain(Some(
PeerChainMetadata::new(Default::default(), ChainMetadata::empty(), None).into(),
))
.collect::<Vec<SyncPeer>>();
let mut shuffled = peers.clone();
shuffled.shuffle(&mut OsRng);
let decide = DecideNextSync::from(shuffled);
assert_eq!(decide.sync_peers, peers);
}
let is_synced = sync.is_synced();
let mut sync_peers = sync.into_sync_peers();
sync_peers.sort();
DecideNextSync { sync_peers, is_synced }
}
}
65 changes: 65 additions & 0 deletions base_layer/core/src/base_node/sync/sync_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
cmp::Ordering,
fmt::{Display, Formatter},
time::Duration,
};
Expand Down Expand Up @@ -97,3 +98,67 @@ impl PartialEq for SyncPeer {
}
}
impl Eq for SyncPeer {}

impl Ord for SyncPeer {
fn cmp(&self, other: &Self) -> Ordering {
let mut result = self
.peer_metadata
.claimed_chain_metadata()
.accumulated_difficulty()
.cmp(&other.peer_metadata.claimed_chain_metadata().accumulated_difficulty());
if result == Ordering::Equal {
match (self.latency(), other.latency()) {
(None, None) => result = Ordering::Equal,
// No latency goes to the end
(Some(_), None) => result = Ordering::Less,
(None, Some(_)) => result = Ordering::Greater,
(Some(la), Some(lb)) => result = la.cmp(&lb),
}
}
result
}
}

impl PartialOrd for SyncPeer {
fn partial_cmp(&self, other: &SyncPeer) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use rand::{rngs::OsRng, seq::SliceRandom};
use tari_common_types::chain_metadata::ChainMetadata;

use super::*;

mod sort_by_latency {
use tari_comms::types::{CommsPublicKey, CommsSecretKey};
use tari_crypto::keys::{PublicKey, SecretKey};

use super::*;
use crate::base_node::chain_metadata_service::PeerChainMetadata;

#[test]
fn it_sorts_by_latency() {
let peers = (0..10)
.map(|i| {
let sk = CommsSecretKey::random(&mut OsRng);
let pk = CommsPublicKey::from_secret_key(&sk);
let node_id = NodeId::from_key(&pk);
PeerChainMetadata::new(node_id, ChainMetadata::empty(), Some(Duration::from_millis(i))).into()
})
.chain(Some(
PeerChainMetadata::new(Default::default(), ChainMetadata::empty(), None).into(),
))
.collect::<Vec<SyncPeer>>();
let mut shuffled = peers.clone();
shuffled.shuffle(&mut OsRng);
assert_ne!(shuffled, peers);
shuffled.sort();
assert_eq!(shuffled, peers);
}
}
}
Loading
Loading