diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index f918065d024724..fe425fa2cf2cd6 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -97,11 +97,8 @@ impl AggregateCommitmentService { return Ok(()); } - let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?; - - while let Ok(new_data) = receiver.try_recv() { - aggregation_data = new_data; - } + let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?; + let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data); let ancestors = aggregation_data.bank.status_cache_ancestors(); if ancestors.is_empty() { diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 3486a24ad21677..841609d3cc3685 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -164,12 +164,9 @@ impl LedgerCleanupService { } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { - let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; + let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; // Get the newest root - while let Ok(new_root) = new_root_receiver.try_recv() { - root = new_root; - } - Ok(root) + Ok(new_root_receiver.try_iter().last().unwrap_or(root)) } pub fn cleanup_ledger( diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index b11b1c0e7ccbbf..e1348ad93511e0 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -1,6 +1,5 @@ use { crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}, - crossbeam_channel::Select, solana_perf::packet::PacketBatch, solana_runtime::bank::Bank, solana_sdk::{ @@ -141,10 +140,10 @@ impl VerifiedVotePackets { vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, would_be_leader: bool, ) -> Result<()> { - let mut sel = Select::new(); - sel.recv(vote_packets_receiver); - let _ = sel.ready_timeout(Duration::from_millis(200))?; - for gossip_votes in vote_packets_receiver.try_iter() { + const RECV_TIMEOUT: Duration = Duration::from_millis(200); + let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; + let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); + for gossip_votes in vote_packets { if would_be_leader { for verfied_vote_metadata in gossip_votes { let VerifiedVoteMetadata { @@ -284,7 +283,7 @@ mod tests { // No new messages, should time out assert_matches!( verified_vote_packets.receive_and_process_vote_packets(&r, true), - Err(Error::ReadyTimeout) + Err(Error::CrossbeamRecvTimeout(_)) ); } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 27ed6be999b250..7cb86db29a181f 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -233,14 +233,10 @@ fn run_check_duplicate( Ok(()) }; - let timer = Duration::from_millis(200); - let shred = shred_receiver.recv_timeout(timer)?; - check_duplicate(shred)?; - while let Ok(shred) = shred_receiver.try_recv() { - check_duplicate(shred)?; - } - - Ok(()) + const RECV_TIMEOUT: Duration = Duration::from_millis(200); + std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?) + .chain(shred_receiver.try_iter()) + .try_for_each(check_duplicate) } fn verify_repair(