Skip to content

Commit

Permalink
simplifies pull-responses handling (#33743)
Browse files Browse the repository at this point in the history
Following:
#33722
from pubkey in PullResponse is no longer used in processing
pull-responses and so the code can be simplified.
  • Loading branch information
behzadnouri authored Oct 18, 2023
1 parent 84c2f9d commit 2465abc
Showing 1 changed file with 11 additions and 76 deletions.
87 changes: 11 additions & 76 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use {
solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY,
std::{
borrow::Cow,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
fs::{self, File},
io::BufReader,
Expand Down Expand Up @@ -2101,77 +2101,27 @@ impl ClusterInfo {

fn handle_batch_pull_responses(
&self,
responses: Vec<(Pubkey, Vec<CrdsValue>)>,
thread_pool: &ThreadPool,
responses: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
epoch_duration: Duration,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_responses_time);
if responses.is_empty() {
return;
}
fn extend<K, V>(hash_map: &mut HashMap<K, Vec<V>>, (key, mut value): (K, Vec<V>))
where
K: Eq + std::hash::Hash,
{
match hash_map.entry(key) {
Entry::Occupied(mut entry) => {
let entry_value = entry.get_mut();
if entry_value.len() < value.len() {
std::mem::swap(entry_value, &mut value);
}
entry_value.extend(value);
}
Entry::Vacant(entry) => {
entry.insert(value);
}
}
}
fn merge<K, V>(
mut hash_map: HashMap<K, Vec<V>>,
other: HashMap<K, Vec<V>>,
) -> HashMap<K, Vec<V>>
where
K: Eq + std::hash::Hash,
{
if hash_map.len() < other.len() {
return merge(other, hash_map);
}
for kv in other {
extend(&mut hash_map, kv);
}
hash_map
}
let responses = thread_pool.install(|| {
responses
.into_par_iter()
.with_min_len(1024)
.fold(HashMap::new, |mut hash_map, kv| {
extend(&mut hash_map, kv);
hash_map
})
.reduce(HashMap::new, merge)
});
if !responses.is_empty() {
let self_pubkey = self.id();
let timeouts = self
.gossip
.make_timeouts(self_pubkey, stakes, epoch_duration);
for (from, data) in responses {
self.handle_pull_response(&from, data, &timeouts);
}
self.handle_pull_response(responses, &timeouts);
}
}

// Returns (failed, timeout, success)
fn handle_pull_response(
&self,
from: &Pubkey,
crds_values: Vec<CrdsValue>,
timeouts: &CrdsTimeouts,
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = {
let _st = ScopedTimer::from(&self.stats.filter_pull_response);
Expand Down Expand Up @@ -2446,9 +2396,9 @@ impl ClusterInfo {
Protocol::PullRequest(filter, caller) => {
pull_requests.push((from_addr, filter, caller))
}
Protocol::PullResponse(from, data) => {
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
pull_responses.push((from, data));
pull_responses.append(&mut data);
}
Protocol::PushMessage(from, data) => {
check_duplicate_instance(&data)?;
Expand All @@ -2460,13 +2410,10 @@ impl ClusterInfo {
}
}
if self.require_stake_for_gossip(stakes) {
for (_, data) in &mut pull_responses {
retain_staked(data, stakes);
}
retain_staked(&mut pull_responses, stakes);
for (_, data) in &mut push_messages {
retain_staked(data, stakes);
}
pull_responses.retain(|(_, data)| !data.is_empty());
push_messages.retain(|(_, data)| !data.is_empty());
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
Expand All @@ -2478,7 +2425,7 @@ impl ClusterInfo {
stakes,
response_sender,
);
self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_duration);
self.handle_batch_pull_responses(pull_responses, stakes, epoch_duration);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
Expand Down Expand Up @@ -3212,18 +3159,11 @@ mod tests {
);
assert_eq!(
(0, 0, 1),
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
data.clone(),
&timeouts
)
cluster_info.handle_pull_response(data.clone(), &timeouts)
);

let entrypoint_pubkey2 = solana_sdk::pubkey::new_rand();
assert_eq!(
(1, 0, 0),
ClusterInfo::handle_pull_response(&cluster_info, &entrypoint_pubkey2, data, &timeouts)
cluster_info.handle_pull_response(data, &timeouts)
);
}

Expand Down Expand Up @@ -3981,12 +3921,7 @@ mod tests {
&stakes,
Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
);
ClusterInfo::handle_pull_response(
&cluster_info,
&entrypoint_pubkey,
vec![entrypoint_crdsvalue],
&timeouts,
);
cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1);
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
Expand Down Expand Up @@ -4495,7 +4430,7 @@ mod tests {
);
assert_eq!(
(0, 0, NO_ENTRIES),
cluster_info.handle_pull_response(&entrypoint_pubkey, data, &timeouts)
cluster_info.handle_pull_response(data, &timeouts)
);
}

Expand Down

0 comments on commit 2465abc

Please sign in to comment.