diff --git a/src/request_stage.rs b/src/request_stage.rs index de8304b0133f00..fb380eba4f1ece 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -5,11 +5,10 @@ use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; -use result::Result; +use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; @@ -81,7 +80,6 @@ impl RequestStage { } pub fn new( request_processor: RequestProcessor, - exit: Arc, packet_receiver: Receiver, packet_recycler: PacketRecycler, blob_recycler: BlobRecycler, @@ -92,16 +90,17 @@ impl RequestStage { let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) .spawn(move || loop { - let e = Self::process_request_packets( + if let Err(e) = Self::process_request_packets( &request_processor_, &packet_receiver, &blob_sender, &packet_recycler, &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), } } }) diff --git a/src/rpu.rs b/src/rpu.rs index e2189342996127..b5e7f45dc5c6b3 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -50,7 +50,7 @@ impl Rpu { let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( requests_socket, - exit.clone(), + exit, packet_recycler.clone(), packet_sender, ); @@ -59,7 +59,6 @@ impl Rpu { let request_processor = RequestProcessor::new(bank.clone()); let (request_stage, blob_receiver) = RequestStage::new( request_processor, - exit.clone(), packet_receiver, packet_recycler.clone(), blob_recycler.clone(),