Skip to content

Commit

Permalink
Update entrypoint contact info even when shred version adoption is no…
Browse files Browse the repository at this point in the history
…t requested
  • Loading branch information
mvines committed Dec 22, 2020
1 parent 1b30155 commit 0447ccf
Showing 1 changed file with 87 additions and 19 deletions.
106 changes: 87 additions & 19 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1731,19 +1731,21 @@ impl ClusterInfo {
Ok(())
}

fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, look up the ContactInfo by the given
// entrypoint gossip adddress
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);

if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint {
fn process_entrypoint(self: &Arc<Self>, entrypoint_processed: &mut bool) {
if *entrypoint_processed {
return;
}

let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful it should exist in the CRDS table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);

if let Some(entrypoint) = entrypoint {
// Adopt the entrypoint's `shred_version` if ours is unset
if self.my_shred_version() == 0 {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
warn!("Unable to adopt entrypoint shred version of 0");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
Expand All @@ -1756,11 +1758,18 @@ impl ClusterInfo {
.unwrap()
.set_shred_version(entrypoint.shred_version);
self.insert_self();
*self.entrypoint.write().unwrap() = Some(entrypoint);
*adopt_shred_version = false;
*entrypoint_processed = true;
}
} else {
*entrypoint_processed = true;
}

// Update the entrypoint's id so future entrypoint pulls correctly reference it
*self.entrypoint.write().unwrap() = Some(entrypoint);
}
} else {
// No entrypoint specified. Nothing more to process
*entrypoint_processed = true;
}
}

Expand Down Expand Up @@ -1807,7 +1816,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = self.my_shred_version() == 0;
let mut entrypoint_processed = false;
let recycler = PacketsRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
Expand Down Expand Up @@ -1854,7 +1863,7 @@ impl ClusterInfo {

self.handle_purge(&thread_pool, &bank_forks, &stakes);

self.handle_adopt_shred_version(&mut adopt_shred_version);
self.process_entrypoint(&mut entrypoint_processed);

//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
Expand Down Expand Up @@ -4225,12 +4234,69 @@ mod tests {
}

#[test]
fn test_handle_adopt_shred_version() {
fn test_process_entrypoint_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 0);

// Simulating starting up with default entrypoint, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);

// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
// 0
let mut gossiped_entrypoint_info =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 0;
cluster_info.insert_info(gossiped_entrypoint_info.clone());

// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(!entrypoint_processed); // <--- entrypoint processing incomplete because shred adoption still pending
assert_eq!(cluster_info.my_shred_version(), 0); // <-- shred version still 0

// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
// !0
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());

// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version now adopted from entrypoint
}

#[test]
fn test_process_entrypoint_without_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
{
let mut contact_info =
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp());
contact_info.shred_version = 2;
contact_info
},
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 2);

// Simulating starting up with default entrypoint, no known id, only a gossip
// address
Expand All @@ -4248,11 +4314,13 @@ mod tests {
cluster_info.insert_info(gossiped_entrypoint_info.clone());

// Adopt the entrypoint's gossiped contact info and verify
ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true);
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert_eq!(cluster_info.my_shred_version(), 1);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
}
}

0 comments on commit 0447ccf

Please sign in to comment.