Skip to content

Commit

Permalink
Update entrypoint contact info even when shred version adoption is no…
Browse files Browse the repository at this point in the history
…t requested
  • Loading branch information
mvines committed Dec 22, 2020
1 parent 3316e71 commit 6d9d5c8
Showing 1 changed file with 75 additions and 19 deletions.
94 changes: 75 additions & 19 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1629,13 +1629,19 @@ impl ClusterInfo {
.flatten()
.collect()
};

debug_assert!(pulls.len() < 2);
if pulls.len() > 1 {
error!("BUG: pulls.len() should be 0 or 1, not {}", pulls.len());
}

self.append_entrypoint_to_pulls(thread_pool, &mut pulls);
self.stats
.new_pull_requests_count
.add_relaxed(pulls.len() as u64);
// There are at most 2 unique peers here: The randomly
// selected pull peer, and possibly also the entrypoint.
let peers: Vec<Pubkey> = pulls.iter().map(|(peer, _, _, _)| *peer).dedup().collect();
let peers: Vec<Pubkey> = pulls.iter().map(|(peer, _, _, _)| *peer).collect();
{
let mut gossip =
self.time_gossip_write_lock("mark_pull", &self.stats.mark_pull_request);
Expand Down Expand Up @@ -1731,19 +1737,21 @@ impl ClusterInfo {
Ok(())
}

fn handle_adopt_shred_version(self: &Arc<Self>, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, look up the ContactInfo by the given
// entrypoint gossip adddress
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
fn process_entrypoint(self: &Arc<Self>, entrypoint_processed: &mut bool) {
if *entrypoint_processed {
return;
}

let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful it should exist in the CRDS table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);

if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint {
if let Some(entrypoint) = entrypoint {
// Adopt the entrypoint's `shred_version` if ours is unset
if self.my_shred_version() == 0 {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
warn!("Unable to adopt entrypoint shred version of 0");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
Expand All @@ -1756,11 +1764,16 @@ impl ClusterInfo {
.unwrap()
.set_shred_version(entrypoint.shred_version);
self.insert_self();
*self.entrypoint.write().unwrap() = Some(entrypoint);
*adopt_shred_version = false;
}
}

// Update the entrypoint's id so future entrypoint pulls correctly reference it
*self.entrypoint.write().unwrap() = Some(entrypoint);
*entrypoint_processed = true;
}
} else {
// No entrypoint specified. Nothing more to process
*entrypoint_processed = true;
}
}

Expand Down Expand Up @@ -1807,7 +1820,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = self.my_shred_version() == 0;
let mut entrypoint_processed = false;
let recycler = PacketsRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
Expand Down Expand Up @@ -1854,7 +1867,7 @@ impl ClusterInfo {

self.handle_purge(&thread_pool, &bank_forks, &stakes);

self.handle_adopt_shred_version(&mut adopt_shred_version);
self.process_entrypoint(&mut entrypoint_processed);

//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
Expand Down Expand Up @@ -4225,12 +4238,53 @@ mod tests {
}

#[test]
fn test_handle_adopt_shred_version() {
fn test_process_entrypoint_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 0);

// Simulating starting up with default entrypoint, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);

// Simulate getting entrypoint ContactInfo from gossip
let mut gossiped_entrypoint_info =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());

// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version adopted from entrypoint
}

#[test]
fn test_process_entrypoint_without_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
{
let mut contact_info =
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp());
contact_info.shred_version = 2;
contact_info
},
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 2);

// Simulating starting up with default entrypoint, no known id, only a gossip
// address
Expand All @@ -4248,11 +4302,13 @@ mod tests {
cluster_info.insert_info(gossiped_entrypoint_info.clone());

// Adopt the entrypoint's gossiped contact info and verify
ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true);
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert_eq!(cluster_info.my_shred_version(), 1);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
}
}

0 comments on commit 6d9d5c8

Please sign in to comment.