Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

removes Select in favor of recv_timeout/try_iter #21981

Merged
merged 1 commit into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ impl AggregateCommitmentService {
return Ok(());
}

let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;

while let Ok(new_data) = receiver.try_recv() {
aggregation_data = new_data;
}
let aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;
let aggregation_data = receiver.try_iter().last().unwrap_or(aggregation_data);

let ancestors = aggregation_data.bank.status_cache_ancestors();
if ancestors.is_empty() {
Expand Down
7 changes: 2 additions & 5 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,9 @@ impl LedgerCleanupService {
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
let mut root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
// Get the newest root
while let Ok(new_root) = new_root_receiver.try_recv() {
root = new_root;
}
Ok(root)
Ok(new_root_receiver.try_iter().last().unwrap_or(root))
}

pub fn cleanup_ledger(
Expand Down
11 changes: 5 additions & 6 deletions core/src/verified_vote_packets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result},
crossbeam_channel::Select,
solana_perf::packet::PacketBatch,
solana_runtime::bank::Bank,
solana_sdk::{
Expand Down Expand Up @@ -141,10 +140,10 @@ impl VerifiedVotePackets {
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
would_be_leader: bool,
) -> Result<()> {
let mut sel = Select::new();
sel.recv(vote_packets_receiver);
let _ = sel.ready_timeout(Duration::from_millis(200))?;
for gossip_votes in vote_packets_receiver.try_iter() {
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?;
let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter());
for gossip_votes in vote_packets {
if would_be_leader {
for verfied_vote_metadata in gossip_votes {
let VerifiedVoteMetadata {
Expand Down Expand Up @@ -284,7 +283,7 @@ mod tests {
// No new messages, should time out
assert_matches!(
verified_vote_packets.receive_and_process_vote_packets(&r, true),
Err(Error::ReadyTimeout)
Err(Error::CrossbeamRecvTimeout(_))
);
}

Expand Down
12 changes: 4 additions & 8 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,10 @@ fn run_check_duplicate(

Ok(())
};
let timer = Duration::from_millis(200);
let shred = shred_receiver.recv_timeout(timer)?;
check_duplicate(shred)?;
while let Ok(shred) = shred_receiver.try_recv() {
check_duplicate(shred)?;
}

Ok(())
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
std::iter::once(shred_receiver.recv_timeout(RECV_TIMEOUT)?)
.chain(shred_receiver.try_iter())
.try_for_each(check_duplicate)
}

fn verify_repair(
Expand Down