Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling cleanup #557

Merged
merged 13 commits into from
Jul 5, 2018
21 changes: 10 additions & 11 deletions src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use counter::Counter;
use packet::{PacketRecycler, Packets, SharedPackets};
use rayon::prelude::*;
use record_stage::Signal;
use result::Result;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand All @@ -27,29 +27,28 @@ pub struct BankingStage {
}

impl BankingStage {
/// Create the stage using `bank`. Exit when either `exit` is set or
/// when `verified_receiver` or the stage's output receiver is dropped.
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
/// Discard input packets using `packet_recycler` to minimize memory
/// allocations in a previous stage such as the `fetch_stage`.
pub fn new(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
packet_recycler: PacketRecycler,
) -> (Self, Receiver<Signal>) {
let (signal_sender, signal_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-banking-stage".to_string())
.spawn(move || loop {
let e = Self::process_packets(
if let Err(e) = Self::process_packets(
bank.clone(),
&verified_receiver,
&signal_sender,
&packet_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
Expand Down
8 changes: 4 additions & 4 deletions src/choose_gossip_peer_strategy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crdt::ReplicatedData;
use crdt::{CrdtError, ReplicatedData};
use rand::distributions::{Distribution, Weighted, WeightedChoice};
use rand::thread_rng;
use result::{Error, Result};
use result::Result;
use signature::PublicKey;
use std;
use std::collections::HashMap;
Expand Down Expand Up @@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
if options.is_empty() {
return Err(Error::CrdtTooSmall);
Err(CrdtError::TooSmall)?;
}

let n = ((self.random)() as usize) % options.len();
Expand Down Expand Up @@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
if options.len() < 1 {
return Err(Error::CrdtTooSmall);
Err(CrdtError::TooSmall)?;
}

let mut weighted_peers = vec![];
Expand Down
55 changes: 26 additions & 29 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2;

#[derive(Debug, PartialEq, Eq)]
pub enum CrdtError {
TooSmall,
}

pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
Expand Down Expand Up @@ -393,7 +398,7 @@ impl Crdt {
.collect();
if nodes.len() < 1 {
warn!("crdt too small");
return Err(Error::CrdtTooSmall);
Err(CrdtError::TooSmall)?;
}
trace!("nodes table {}", nodes.len());

Expand Down Expand Up @@ -433,13 +438,10 @@ impl Crdt {
.collect();
trace!("broadcast results {}", errs.len());
for e in errs {
match e {
Err(e) => {
error!("broadcast result {:?}", e);
return Err(Error::IO(e));
}
_ => (),
if let Err(e) = &e {
error!("broadcast result {:?}", e);
}
e?;
*transmit_index += 1;
}
Ok(())
Expand Down Expand Up @@ -491,13 +493,10 @@ impl Crdt {
})
.collect();
for e in errs {
match e {
Err(e) => {
info!("retransmit error {:?}", e);
return Err(Error::IO(e));
}
_ => (),
if let Err(e) = &e {
error!("broadcast result {:?}", e);
}
e?;
}
Ok(())
}
Expand Down Expand Up @@ -541,7 +540,7 @@ impl Crdt {
.filter(|r| r.id != self.me && r.repair_addr != daddr)
.collect();
if valid.is_empty() {
return Err(Error::CrdtTooSmall);
Err(CrdtError::TooSmall)?;
}
let n = (Self::random() as usize) % valid.len();
let addr = valid[n].gossip_addr.clone();
Expand All @@ -566,18 +565,14 @@ impl Crdt {

let choose_peer_result = choose_peer_strategy.choose_peer(options);

let v = match choose_peer_result {
Ok(peer) => peer,
Err(Error::CrdtTooSmall) => {
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
return Err(Error::CrdtTooSmall);
}
Err(e) => return Err(e),
if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result {
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
};
let v = choose_peer_result?;

let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
Expand Down Expand Up @@ -1014,7 +1009,9 @@ impl TestNode {

#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE};
use crdt::{
parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
};
use logger;
use packet::BlobRecycler;
use result::Error;
Expand Down Expand Up @@ -1140,7 +1137,7 @@ mod tests {
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
Expand All @@ -1151,7 +1148,7 @@ mod tests {
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
Expand Down Expand Up @@ -1202,7 +1199,7 @@ mod tests {
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtTooSmall));
assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
Expand Down
1 change: 0 additions & 1 deletion src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl FullNode {

let t_broadcast = streamer::broadcaster(
node.sockets.broadcast,
exit.clone(),
crdt,
window,
entry_height,
Expand Down
Empty file modified src/nat.rs
100755 → 100644
Empty file.
8 changes: 2 additions & 6 deletions src/ncp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ impl Ncp {
request_sender,
)?;
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder(
gossip_send_socket,
exit.clone(),
blob_recycler.clone(),
response_receiver,
);
let t_responder =
streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver);
let t_listen = Crdt::listen(
crdt.clone(),
window,
Expand Down
15 changes: 9 additions & 6 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use bank::Bank;
use ledger;
use result::Result;
use result::{Error, Result};
use service::Service;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
Expand All @@ -29,13 +29,16 @@ impl ReplicateStage {
Ok(())
}

pub fn new(bank: Arc<Bank>, exit: Arc<AtomicBool>, window_receiver: BlobReceiver) -> Self {
pub fn new(bank: Arc<Bank>, window_receiver: BlobReceiver) -> Self {
let thread_hdl = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || loop {
let e = Self::replicate_requests(&bank, &window_receiver);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
if let Err(e) = Self::replicate_requests(&bank, &window_receiver) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
.unwrap();
Expand Down
17 changes: 8 additions & 9 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets};
use rayon::prelude::*;
use request::Request;
use request_processor::RequestProcessor;
use result::Result;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
Expand Down Expand Up @@ -81,7 +80,6 @@ impl RequestStage {
}
pub fn new(
request_processor: RequestProcessor,
exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>,
packet_recycler: PacketRecycler,
blob_recycler: BlobRecycler,
Expand All @@ -92,16 +90,17 @@ impl RequestStage {
let thread_hdl = Builder::new()
.name("solana-request-stage".to_string())
.spawn(move || loop {
let e = Self::process_request_packets(
if let Err(e) = Self::process_request_packets(
&request_processor_,
&packet_receiver,
&blob_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
})
Expand Down
27 changes: 24 additions & 3 deletions src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

use bank;
use bincode;
use crdt;
#[cfg(feature = "erasure")]
use erasure;
use serde_json;
use std;
use std::any::Any;
use streamer;

#[derive(Debug)]
pub enum Error {
Expand All @@ -16,10 +20,11 @@ pub enum Error {
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>),
BankError(bank::BankError),
CrdtError(crdt::CrdtError),
WindowError(streamer::WindowError),
#[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
SendError,
Services,
CrdtTooSmall,
GenericError,
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -39,6 +44,22 @@ impl std::convert::From<bank::BankError> for Error {
Error::BankError(e)
}
}
impl std::convert::From<crdt::CrdtError> for Error {
fn from(e: crdt::CrdtError) -> Error {
Error::CrdtError(e)
}
}
impl std::convert::From<streamer::WindowError> for Error {
fn from(e: streamer::WindowError) -> Error {
Error::WindowError(e)
}
}
#[cfg(feature = "erasure")]
impl std::convert::From<erasure::ErasureError> for Error {
fn from(e: erasure::ErasureError) -> Error {
Error::ErasureError(e)
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
Error::SendError
Expand Down
10 changes: 2 additions & 8 deletions src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Rpu {
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
requests_socket,
exit.clone(),
exit,
packet_recycler.clone(),
packet_sender,
);
Expand All @@ -59,18 +59,12 @@ impl Rpu {
let request_processor = RequestProcessor::new(bank.clone());
let (request_stage, blob_receiver) = RequestStage::new(
request_processor,
exit.clone(),
packet_receiver,
packet_recycler.clone(),
blob_recycler.clone(),
);

let t_responder = streamer::responder(
respond_socket,
exit.clone(),
blob_recycler.clone(),
blob_receiver,
);
let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver);

let mut thread_hdls = vec![t_receiver, t_responder];
thread_hdls.extend(request_stage.thread_hdls().into_iter());
Expand Down
Loading