Skip to content

Commit

Permalink
removes Select in favor of recv_timeout/try_iter (#21981)
Browse files Browse the repository at this point in the history
crossbeam_channel::Select::ready_timeout might return with success spuriously.
  • Loading branch information
behzadnouri authored Dec 18, 2021
1 parent 3fe942a commit 7476dfe
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 24 deletions.
7 changes: 2 additions & 5 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ impl AggregateCommitmentService {
return Ok(());
}

let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;

while let Ok(new_data) = receiver.try_recv() {
aggregation_data = new_data;
}
let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);

let ancestors = aggregation_data.bank.status_cache_ancestors();
if ancestors.is_empty() {
Expand Down
7 changes: 2 additions & 5 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,9 @@ impl LedgerCleanupService {
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
// Get the newest root
while let Ok(new_root) = new_root_receiver.try_recv() {
root = new_root;
}
Ok(root)
Ok(new_root_receiver.try_iter().last().unwrap_or(root))
}

pub fn cleanup_ledger(
Expand Down
11 changes: 5 additions & 6 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
crossbeam_channel::Select,
solana_perf::packet::PacketBatch,
solana_runtime::bank::Bank,
solana_sdk::{
Expand Down Expand Up @@ -141,10 +140,10 @@ impl VerifiedVotePackets {
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
would_be_leader: bool,
) -> Result<()> {
let mut sel = Select::new();
sel.recv(vote_packets_receiver);
let _ = sel.ready_timeout(Duration::from_millis(200))?;
for gossip_votes in vote_packets_receiver.try_iter() {
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
for gossip_votes in vote_packets {
if would_be_leader {
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
Expand Down Expand Up @@ -284,7 +283,7 @@ mod tests {
// No new messages, should time out
assert_matches!(
verified_vote_packets.receive_and_process_vote_packets(&r, true),
Err(Error::ReadyTimeout)
Err(Error::CrossbeamRecvTimeout(_))
);
}

Expand Down
12 changes: 4 additions & 8 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,10 @@ fn run_check_duplicate(

Ok(())
};
let timer = Duration::from_millis(200);
let shred = shred_receiver.recv_timeout(timer)?;
check_duplicate(shred)?;
while let Ok(shred) = shred_receiver.try_recv() {
check_duplicate(shred)?;
}

Ok(())
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
.chain(shred_receiver.try_iter())
.try_for_each(check_duplicate)
}

fn verify_repair(
Expand Down

0 comments on commit 7476dfe

Please sign in to comment.