From d154e0fc825575c30afed427dbfe14441a79fc60 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 13:02:25 -0600 Subject: [PATCH 01/13] Fix code comments --- src/write_stage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/write_stage.rs b/src/write_stage.rs index 04c315e1e3cfdf..afe1637ba68988 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -23,7 +23,7 @@ pub struct WriteStage { } impl WriteStage { - /// Process any Entry items that have been published by the Historian. + /// Process any Entry items that have been published by the RecordStage. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( entry_writer: &mut EntryWriter, @@ -43,7 +43,7 @@ impl WriteStage { Ok(()) } - /// Create a new Rpu that wraps the given Bank. + /// Create a new WriteStage for writing and broadcasting entries. pub fn new( bank: Arc, exit: Arc, From a5ae0f54e25b84cbc543a8b8a364d3a43f41d675 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 13:03:04 -0600 Subject: [PATCH 02/13] Remove executable bit from nat.rs --- src/nat.rs | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 src/nat.rs diff --git a/src/nat.rs b/src/nat.rs old mode 100755 new mode 100644 From cee34f7631a70bff2791f1a964b81c50558486e8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 14:37:13 -0600 Subject: [PATCH 03/13] Handle errors consistently Error handling is still clumsy. We should switch to something like `error-chain` or `Result>`, but until then, we can at least be consistent across modules. --- src/choose_gossip_peer_strategy.rs | 8 ++--- src/crdt.rs | 55 ++++++++++++++---------------- src/result.rs | 27 +++++++++++++-- src/streamer.rs | 11 ++++-- 4 files changed, 62 insertions(+), 39 deletions(-) diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index dec6a0aee3f00d..7a0850ca286840 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,7 +1,7 @@ -use crdt::ReplicatedData; +use crdt::{CrdtError, ReplicatedData}; use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::thread_rng; -use result::{Error, Result}; +use result::Result; use signature::PublicKey; use std; use std::collections::HashMap; @@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.is_empty() { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let n = ((self.random)() as usize) % options.len(); @@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.len() < 1 { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let mut weighted_peers = vec![]; diff --git a/src/crdt.rs b/src/crdt.rs index f9ef05a2be35a5..4beb3073b8494f 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -42,6 +42,11 @@ const GOSSIP_SLEEP_MILLIS: u64 = 100; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; +#[derive(Debug, PartialEq, Eq)] +pub enum CrdtError { + TooSmall, +} + pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { @@ -393,7 +398,7 @@ impl Crdt { .collect(); if nodes.len() < 1 { warn!("crdt too small"); - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } trace!("nodes table {}", nodes.len()); @@ -433,13 +438,10 @@ impl Crdt { .collect(); trace!("broadcast results {}", errs.len()); for e in errs { - match e { - Err(e) => { - error!("broadcast result {:?}", e); - return Err(Error::IO(e)); - } - _ => (), + if let Err(e) = &e { + error!("broadcast result {:?}", e); } + e?; *transmit_index += 1; } Ok(()) @@ -491,13 +493,10 @@ impl Crdt { }) .collect(); for e in errs { - match e { - Err(e) => { - info!("retransmit error {:?}", e); - return Err(Error::IO(e)); - } - _ => (), + if let Err(e) = &e { + error!("broadcast result {:?}", e); } + e?; } Ok(()) } @@ -541,7 +540,7 @@ impl Crdt { .filter(|r| r.id != self.me && r.repair_addr != daddr) .collect(); if valid.is_empty() { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); let addr = valid[n].gossip_addr.clone(); @@ -566,18 +565,14 @@ impl Crdt { let choose_peer_result = choose_peer_strategy.choose_peer(options); - let v = match choose_peer_result { - Ok(peer) => peer, - Err(Error::CrdtTooSmall) => { - trace!( - "crdt too small for gossip {:?} {}", - &self.me[..4], - self.table.len() - ); - return Err(Error::CrdtTooSmall); - } - Err(e) => return Err(e), + if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result { + trace!( + "crdt too small for gossip {:?} {}", + &self.me[..4], + self.table.len() + ); }; + let v = choose_peer_result?; let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); @@ -1014,7 +1009,9 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; + use crdt::{ + parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, + }; use logger; use packet::BlobRecycler; use result::Error; @@ -1140,7 +1137,7 @@ mod tests { ); let mut crdt = Crdt::new(me.clone()); let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), @@ -1151,7 +1148,7 @@ mod tests { ); crdt.insert(&nxt); let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), @@ -1202,7 +1199,7 @@ mod tests { ); let mut crdt = Crdt::new(me.clone()); let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt1 = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), diff --git a/src/result.rs b/src/result.rs index e31e45e2d1da59..af056ba35c6b12 100644 --- a/src/result.rs +++ b/src/result.rs @@ -2,9 +2,13 @@ use bank; use bincode; +use crdt; +#[cfg(feature = "erasure")] +use erasure; use serde_json; use std; use std::any::Any; +use streamer; #[derive(Debug)] pub enum Error { @@ -16,10 +20,11 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), BankError(bank::BankError), + CrdtError(crdt::CrdtError), + WindowError(streamer::WindowError), + #[cfg(feature = "erasure")] + ErasureError(erasure::ErasureError), SendError, - Services, - CrdtTooSmall, - GenericError, } pub type Result = std::result::Result; @@ -39,6 +44,22 @@ impl std::convert::From for Error { Error::BankError(e) } } +impl std::convert::From for Error { + fn from(e: crdt::CrdtError) -> Error { + Error::CrdtError(e) + } +} +impl std::convert::From for Error { + fn from(e: streamer::WindowError) -> Error { + Error::WindowError(e) + } +} +#[cfg(feature = "erasure")] +impl std::convert::From for Error { + fn from(e: erasure::ErasureError) -> Error { + Error::ErasureError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/src/streamer.rs b/src/streamer.rs index 91674657fdb5bd..9e215a77372f9b 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -6,7 +6,7 @@ use erasure; use packet::{ Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, }; -use result::{Error, Result}; +use result::Result; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; @@ -23,6 +23,11 @@ pub type BlobSender = Sender; pub type BlobReceiver = Receiver; pub type Window = Arc>>>; +#[derive(Debug, PartialEq, Eq)] +pub enum WindowError { + GenericError, +} + fn recv_loop( sock: &UdpSocket, exit: &Arc, @@ -152,7 +157,7 @@ fn find_next_missing( received: &mut u64, ) -> Result)>> { if *received <= *consumed { - return Err(Error::GenericError); + Err(WindowError::GenericError)?; } let window = locked_window.read().unwrap(); let reqs: Vec<_> = (*consumed..*received) @@ -575,7 +580,7 @@ fn broadcast( &mut window.write().unwrap(), *receive_index as usize, blobs_len, - ).map_err(|_| Error::GenericError)?; + )?; } *receive_index += blobs_len as u64; From 0a55fcedb95d14d53b7afe39b8a07e7e1f4c9c41 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:29:19 -0600 Subject: [PATCH 04/13] Exit write_stage on channel errors --- src/write_stage.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/write_stage.rs b/src/write_stage.rs index afe1637ba68988..eba6e9e5397798 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -7,12 +7,12 @@ use entry::Entry; use entry_writer::EntryWriter; use ledger::Block; use packet::BlobRecycler; -use result::Result; +use result::{Error, Result}; use service::Service; use std::collections::VecDeque; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -57,12 +57,19 @@ impl WriteStage { .spawn(move || { let mut entry_writer = EntryWriter::new(&bank, writer); loop { - let _ = Self::write_and_send_entries( + if let Err(e) = Self::write_and_send_entries( &mut entry_writer, &blob_sender, &blob_recycler, &entry_receiver, - ); + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::SendError => break, + _ => error!("{:?}", e), + } + }; + if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); break; From 036afc717c756c1b228f1f75d16c9f2a95a2a0d4 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:34:22 -0600 Subject: [PATCH 05/13] Remove exit variable from WriteStage --- src/tpu.rs | 9 ++------- src/write_stage.rs | 9 +-------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 8ce230aa9d538b..4136e2d65eec6b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -80,13 +80,8 @@ impl Tpu { None => RecordStage::new(signal_receiver, &bank.last_id()), }; - let (write_stage, blob_receiver) = WriteStage::new( - bank.clone(), - exit.clone(), - blob_recycler.clone(), - writer, - entry_receiver, - ); + let (write_stage, blob_receiver) = + WriteStage::new(bank.clone(), blob_recycler.clone(), writer, entry_receiver); let tpu = Tpu { fetch_stage, diff --git a/src/write_stage.rs b/src/write_stage.rs index eba6e9e5397798..a892e4b0a6cfc0 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -11,7 +11,6 @@ use result::{Error, Result}; use service::Service; use std::collections::VecDeque; use std::io::Write; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; @@ -46,7 +45,6 @@ impl WriteStage { /// Create a new WriteStage for writing and broadcasting entries. pub fn new( bank: Arc, - exit: Arc, blob_recycler: BlobRecycler, writer: W, entry_receiver: Receiver>, @@ -65,15 +63,10 @@ impl WriteStage { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::SendError => break, + Error::SendError => (), // Ignore when downstream stage exists prematurely. _ => error!("{:?}", e), } }; - - if exit.load(Ordering::Relaxed) { - info!("broadcat_service exiting"); - break; - } } }) .unwrap(); From 19e951fa4ea942ebd79dc5d07884d22c715c5f38 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:41:53 -0600 Subject: [PATCH 06/13] Remove exit variable from BankingStage --- src/banking_stage.rs | 22 +++++++++++----------- src/tpu.rs | 8 ++------ src/write_stage.rs | 3 ++- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 871df48432e14a..94f79d78faec0c 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -8,11 +8,11 @@ use counter::Counter; use packet::{PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; -use result::Result; +use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -27,13 +27,11 @@ pub struct BankingStage { } impl BankingStage { - /// Create the stage using `bank`. Exit when either `exit` is set or - /// when `verified_receiver` or the stage's output receiver is dropped. + /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Discard input packets using `packet_recycler` to minimize memory /// allocations in a previous stage such as the `fetch_stage`. pub fn new( bank: Arc, - exit: Arc, verified_receiver: Receiver)>>, packet_recycler: PacketRecycler, ) -> (Self, Receiver) { @@ -41,15 +39,17 @@ impl BankingStage { let thread_hdl = Builder::new() .name("solana-banking-stage".to_string()) .spawn(move || loop { - let e = Self::process_packets( + if let Err(e) = Self::process_packets( bank.clone(), &verified_receiver, &signal_sender, &packet_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => (), // Ignore when downstream stage exits prematurely. + _ => error!("{:?}", e), } } }) diff --git a/src/tpu.rs b/src/tpu.rs index 4136e2d65eec6b..e3cacad2bc1e9c 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -66,12 +66,8 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(exit.clone(), packet_receiver); - let (banking_stage, signal_receiver) = BankingStage::new( - bank.clone(), - exit.clone(), - verified_receiver, - packet_recycler.clone(), - ); + let (banking_stage, signal_receiver) = + BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); let (record_stage, entry_receiver) = match tick_duration { Some(tick_duration) => { diff --git a/src/write_stage.rs b/src/write_stage.rs index a892e4b0a6cfc0..ba48b3ccb2cc57 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -63,7 +63,8 @@ impl WriteStage { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::SendError => (), // Ignore when downstream stage exists prematurely. + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => (), // Ignore when downstream stage exits prematurely. _ => error!("{:?}", e), } }; From f090f26e107925de5013358b4b8452b740ee32dd Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:50:42 -0600 Subject: [PATCH 07/13] Remove exit variable from broadcast [stage] --- src/fullnode.rs | 1 - src/streamer.rs | 21 ++++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index b9014737841c67..080745a6cfa11d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -178,7 +178,6 @@ impl FullNode { let t_broadcast = streamer::broadcaster( node.sockets.broadcast, - exit.clone(), crdt, window, entry_height, diff --git a/src/streamer.rs b/src/streamer.rs index 9e215a77372f9b..27e0488697c391 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,17 +1,17 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crdt::{Crdt, ReplicatedData}; +use crdt::{Crdt, CrdtError, ReplicatedData}; #[cfg(feature = "erasure")] use erasure; use packet::{ Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, }; -use result::Result; +use result::{Error, Result}; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; @@ -602,7 +602,6 @@ fn broadcast( /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn broadcaster( sock: UdpSocket, - exit: Arc, crdt: Arc>, window: Window, entry_height: u64, @@ -616,10 +615,7 @@ pub fn broadcaster( let mut receive_index = entry_height; let debug_id = crdt.read().unwrap().debug_id(); loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = broadcast( + if let Err(e) = broadcast( debug_id, &crdt, &window, @@ -628,7 +624,14 @@ pub fn broadcaster( &sock, &mut transmit_index, &mut receive_index, - ); + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::CrdtError(CrdtError::TooSmall) => (), // TODO: Why are the unit-tests throwing hundreds of these? + _ => error!("{:?}", e), + } + } } }) .unwrap() From 2c71b3a5e5ee5f5fa56ea5c472c5886aa6106210 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:52:19 -0600 Subject: [PATCH 08/13] No longer need to ignore downstream send errors By removing the exit variables, the downstream stages wait for upstream stages to drop their senders before exiting. --- src/banking_stage.rs | 1 - src/write_stage.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 94f79d78faec0c..f7d39c31e722d2 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -48,7 +48,6 @@ impl BankingStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => (), // Ignore when downstream stage exits prematurely. _ => error!("{:?}", e), } } diff --git a/src/write_stage.rs b/src/write_stage.rs index ba48b3ccb2cc57..241e22dbb4c032 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -64,7 +64,6 @@ impl WriteStage { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::SendError => (), // Ignore when downstream stage exits prematurely. _ => error!("{:?}", e), } }; From 09d69003517a7ea84e5f58eca523e0ae58e7198d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:58:33 -0600 Subject: [PATCH 09/13] Remove exit variable from VerifyStage --- src/sigverify_stage.rs | 21 ++++++++++----------- src/tpu.rs | 5 ++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index a6eeda8a2eaf23..e90ea238f6384b 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -7,11 +7,10 @@ use packet::SharedPackets; use rand::{thread_rng, Rng}; -use result::Result; +use result::{Error, Result}; use service::Service; use sigverify; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, spawn, JoinHandle}; use std::time::Instant; @@ -24,11 +23,10 @@ pub struct SigVerifyStage { impl SigVerifyStage { pub fn new( - exit: Arc, packet_receiver: Receiver, ) -> (Self, Receiver)>>) { let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); + let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); (SigVerifyStage { thread_hdls }, verified_receiver) } @@ -75,27 +73,28 @@ impl SigVerifyStage { } fn verifier_service( - exit: Arc, packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; + if let Err(e) = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } }) } fn verifier_services( - exit: Arc, packet_receiver: PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) - .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .map(|_| Self::verifier_service(receiver.clone(), sender.clone())) .collect() } } diff --git a/src/tpu.rs b/src/tpu.rs index e3cacad2bc1e9c..ae213e888287fe 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -61,10 +61,9 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); + FetchStage::new(transactions_socket, exit, packet_recycler.clone()); - let (sigverify_stage, verified_receiver) = - SigVerifyStage::new(exit.clone(), packet_receiver); + let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); From 743a96e7739a6aeadb656c813e2afa948f5cc459 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 16:29:49 -0600 Subject: [PATCH 10/13] Remove exit variable from ReplicateStage --- src/replicate_stage.rs | 15 +++++++++------ src/tvu.rs | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 661edf60e5973a..68fa7f94df0070 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,9 +2,9 @@ use bank::Bank; use ledger; -use result::Result; +use result::{Error, Result}; use service::Service; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -29,13 +29,16 @@ impl ReplicateStage { Ok(()) } - pub fn new(bank: Arc, exit: Arc, window_receiver: BlobReceiver) -> Self { + pub fn new(bank: Arc, window_receiver: BlobReceiver) -> Self { let thread_hdl = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - let e = Self::replicate_requests(&bank, &window_receiver); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; + if let Err(e) = Self::replicate_requests(&bank, &window_receiver) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } }) .unwrap(); diff --git a/src/tvu.rs b/src/tvu.rs index aaf87f1e391973..77330a1fefe1e5 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -94,7 +94,7 @@ impl Tvu { blob_receiver, ); - let replicate_stage = ReplicateStage::new(bank, exit, blob_receiver); + let replicate_stage = ReplicateStage::new(bank, blob_receiver); Tvu { replicate_stage, From 38a7345f753013d746bb8f33ad023db425fe0d27 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 16:37:04 -0600 Subject: [PATCH 11/13] Remove exit variable from WindowStage and retransmit [stage] --- src/streamer.rs | 26 ++++++++++++++------------ src/tvu.rs | 3 +-- src/window_stage.rs | 4 ---- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 27e0488697c391..7bb5f764472484 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -464,7 +464,6 @@ pub fn default_window() -> Window { } pub fn window( - exit: Arc, crdt: Arc>, window: Window, entry_height: u64, @@ -482,10 +481,7 @@ pub fn window( let mut times = 0; let debug_id = crdt.read().unwrap().debug_id(); loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = recv_window( + if let Err(e) = recv_window( debug_id, &window, &crdt, @@ -495,7 +491,13 @@ pub fn window( &r, &s, &retransmit, - ); + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } + } let _ = repair_window( debug_id, &window, @@ -669,7 +671,6 @@ fn retransmit( /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn retransmitter( sock: UdpSocket, - exit: Arc, crdt: Arc>, recycler: BlobRecycler, r: BlobReceiver, @@ -679,11 +680,13 @@ pub fn retransmitter( .spawn(move || { trace!("retransmitter started"); loop { - if exit.load(Ordering::Relaxed) { - break; + if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } - // TODO: handle this error - let _ = retransmit(&crdt, &recycler, &r, &sock); } trace!("exiting retransmitter"); }) @@ -903,7 +906,6 @@ mod test { let (s_retransmit, r_retransmit) = channel(); let win = default_window(); let t_window = window( - exit.clone(), subs, win, 0, diff --git a/src/tvu.rs b/src/tvu.rs index 77330a1fefe1e5..feeddd685d0b4a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -78,7 +78,7 @@ impl Tvu { let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_receiver) = BlobFetchStage::new_multi_socket( vec![replicate_socket, repair_socket], - exit.clone(), + exit, blob_recycler.clone(), ); //TODO @@ -89,7 +89,6 @@ impl Tvu { window, entry_height, retransmit_socket, - exit.clone(), blob_recycler.clone(), blob_receiver, ); diff --git a/src/window_stage.rs b/src/window_stage.rs index d5eaa6a278c7ec..74a0853de2a42f 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -4,7 +4,6 @@ use crdt::Crdt; use packet::BlobRecycler; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, JoinHandle}; @@ -20,7 +19,6 @@ impl WindowStage { window: Window, entry_height: u64, retransmit_socket: UdpSocket, - exit: Arc, blob_recycler: BlobRecycler, fetch_stage_receiver: BlobReceiver, ) -> (Self, BlobReceiver) { @@ -28,14 +26,12 @@ impl WindowStage { let t_retransmit = streamer::retransmitter( retransmit_socket, - exit.clone(), crdt.clone(), blob_recycler.clone(), retransmit_receiver, ); let (blob_sender, blob_receiver) = channel(); let t_window = streamer::window( - exit.clone(), crdt.clone(), window, entry_height, From bd620d559205a502069a8f54e44fce51dbe9d061 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 16:41:03 -0600 Subject: [PATCH 12/13] Remove exit variable from respond [stage] And drop the sender that feeds input to the responder. --- src/ncp.rs | 8 ++--- src/rpu.rs | 7 +--- src/streamer.rs | 87 +++++++++++++++++++++++++------------------------ src/tvu.rs | 9 ++--- 4 files changed, 51 insertions(+), 60 deletions(-) diff --git a/src/ncp.rs b/src/ncp.rs index bbf7bd6ad99cd9..216d280fe1c97b 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -37,12 +37,8 @@ impl Ncp { request_sender, )?; let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder( - gossip_send_socket, - exit.clone(), - blob_recycler.clone(), - response_receiver, - ); + let t_responder = + streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver); let t_listen = Crdt::listen( crdt.clone(), window, diff --git a/src/rpu.rs b/src/rpu.rs index 4447bbe8fb0398..e2189342996127 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -65,12 +65,7 @@ impl Rpu { blob_recycler.clone(), ); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - blob_receiver, - ); + let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver); let mut thread_hdls = vec![t_receiver, t_responder]; thread_hdls.extend(request_stage.thread_hdls().into_iter()); diff --git a/src/streamer.rs b/src/streamer.rs index 7bb5f764472484..ea8eb8aef44889 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -101,17 +101,16 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> Ok((batch, len)) } -pub fn responder( - sock: UdpSocket, - exit: Arc, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { +pub fn responder(sock: UdpSocket, recycler: BlobRecycler, r: BlobReceiver) -> JoinHandle<()> { Builder::new() .name("solana-responder".to_string()) .spawn(move || loop { - if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { - break; + if let Err(e) = recv_send(&sock, &recycler, &r) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } }) .unwrap() @@ -844,20 +843,24 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); - let (s_responder, r_responder) = channel(); - let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); - let mut msgs = VecDeque::new(); - for i in 0..10 { - let b = resp_recycler.allocate(); - { - let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, resp_recycler.clone(), r_responder); + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + } + msgs.push_back(b); } - msgs.push_back(b); - } - s_responder.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); + t_responder + }; + let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); @@ -914,28 +917,28 @@ mod test { s_window, s_retransmit, ); - let (s_responder, r_responder) = channel(); - let t_responder = responder( - tn.sockets.replicate, - exit.clone(), - resp_recycler.clone(), - r_responder, - ); - let mut msgs = VecDeque::new(); - for v in 0..10 { - let i = 9 - v; - let b = resp_recycler.allocate(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.gossip_addr); + + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder); + let mut msgs = VecDeque::new(); + for v in 0..10 { + let i = 9 - v; + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&tn.data.gossip_addr); + } + msgs.push_back(b); } - msgs.push_back(b); - } - s_responder.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); + t_responder + }; + let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); diff --git a/src/tvu.rs b/src/tvu.rs index feeddd685d0b4a..17612fb0e205ac 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -193,12 +193,8 @@ pub mod tests { // simulate leader sending messages let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder( - leader.sockets.requests, - exit.clone(), - resp_recycler.clone(), - r_responder, - ); + let t_responder = + streamer::responder(leader.sockets.requests, resp_recycler.clone(), r_responder); let starting_balance = 10_000; let mint = Mint::new(starting_balance); @@ -269,6 +265,7 @@ pub mod tests { // send the blobs into the socket s_responder.send(msgs).expect("send"); + drop(s_responder); // receive retransmitted messages let timer = Duration::new(1, 0); From 2df1259cc23ff647f63092520108cb66b6610e14 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 16:43:48 -0600 Subject: [PATCH 13/13] Remove exit variable from RequestStage --- src/request_stage.rs | 17 ++++++++--------- src/rpu.rs | 3 +-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/request_stage.rs b/src/request_stage.rs index de8304b0133f00..fb380eba4f1ece 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -5,11 +5,10 @@ use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; -use result::Result; +use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; @@ -81,7 +80,6 @@ impl RequestStage { } pub fn new( request_processor: RequestProcessor, - exit: Arc, packet_receiver: Receiver, packet_recycler: PacketRecycler, blob_recycler: BlobRecycler, @@ -92,16 +90,17 @@ impl RequestStage { let thread_hdl = Builder::new() .name("solana-request-stage".to_string()) .spawn(move || loop { - let e = Self::process_request_packets( + if let Err(e) = Self::process_request_packets( &request_processor_, &packet_receiver, &blob_sender, &packet_recycler, &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), } } }) diff --git a/src/rpu.rs b/src/rpu.rs index e2189342996127..b5e7f45dc5c6b3 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -50,7 +50,7 @@ impl Rpu { let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( requests_socket, - exit.clone(), + exit, packet_recycler.clone(), packet_sender, ); @@ -59,7 +59,6 @@ impl Rpu { let request_processor = RequestProcessor::new(bank.clone()); let (request_stage, blob_receiver) = RequestStage::new( request_processor, - exit.clone(), packet_receiver, packet_recycler.clone(), blob_recycler.clone(),