Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up stage exit handling #3122

Merged
merged 6 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ fn main() -> Result<()> {

let (s_reader, r_reader) = channel();
read_channels.push(r_reader);
read_threads.push(receiver(
Arc::new(read),
exit.clone(),
s_reader,
"bench-streamer",
));
read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer"));
}

let t_producer1 = producer(&addr, exit.clone());
Expand Down
12 changes: 8 additions & 4 deletions benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node;
use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
Expand All @@ -17,6 +18,7 @@ use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES};
use std::iter;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -102,7 +104,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
Expand All @@ -127,7 +129,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
start += half_len;
start %= verified.len();
});
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}

#[bench]
Expand Down Expand Up @@ -208,7 +211,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect())
})
.collect();
let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
Expand All @@ -233,5 +236,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
start += half_len;
start %= verified.len();
});
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
23 changes: 14 additions & 9 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl Service for BankingStage {
pub fn create_test_recorder(
bank: &Arc<Bank>,
) -> (
Arc<AtomicBool>,
Arc<Mutex<PohRecorder>>,
PohService,
Receiver<WorkingBankEntries>,
Expand All @@ -356,7 +357,7 @@ pub fn create_test_recorder(
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit);
(poh_recorder, poh_service, entry_receiver)
(exit, poh_recorder, poh_service, entry_receiver)
}

#[cfg(test)]
Expand All @@ -378,13 +379,14 @@ mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
drop(verified_sender);
exit.store(true, Ordering::Relaxed);
banking_stage.join().unwrap();
poh_service.close().unwrap();
poh_service.join().unwrap();
}

#[test]
Expand All @@ -395,15 +397,16 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver);
trace!("sending bank");
sleep(Duration::from_millis(600));
drop(verified_sender);
poh_service.close().unwrap();
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

trace!("getting entries");
Expand All @@ -424,7 +427,7 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
Expand Down Expand Up @@ -452,7 +455,8 @@ mod tests {
.unwrap();

drop(verified_sender);
poh_service.close().expect("close");
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

//receive entries + ticks
Expand Down Expand Up @@ -481,7 +485,7 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank);
let cluster_info = ClusterInfo::new(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
poh_recorder.lock().unwrap().set_bank(&bank);
Expand Down Expand Up @@ -516,7 +520,8 @@ mod tests {
.unwrap();

drop(verified_sender);
poh_service.close().expect("close");;
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
drop(poh_recorder);

// Collect the ledger and feed it to a new bank.
Expand Down
15 changes: 5 additions & 10 deletions core/src/blob_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,29 @@
use crate::service::Service;
use crate::streamer::{self, BlobSender};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

pub struct BlobFetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}

impl BlobFetchStage {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: Arc<AtomicBool>) -> Self {
pub fn new(socket: Arc<UdpSocket>, sender: &BlobSender, exit: &Arc<AtomicBool>) -> Self {
Self::new_multi_socket(vec![socket], sender, exit)
}
pub fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
sender: &BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone()))
.map(|socket| streamer::blob_receiver(socket, &exit, sender.clone()))
.collect();

Self { exit, thread_hdls }
}

pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
Self { thread_hdls }
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/blockstream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ impl BlockstreamService {
slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>,
blockstream_socket: String,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let mut blockstream = Blockstream::new(blockstream_socket);
let exit = exit.clone();
let t_blockstream = Builder::new()
.name("solana-blockstream".to_string())
.spawn(move || loop {
Expand Down
7 changes: 4 additions & 3 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,15 @@ impl BroadcastStage {
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntries>,
exit_sender: Arc<AtomicBool>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> Self {
let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
let _finalizer = Finalizer::new(exit_sender);
Self::run(&sock, &cluster_info, &receiver, &blocktree)
})
.unwrap();
Expand Down Expand Up @@ -299,7 +300,7 @@ mod test {
leader_info.sockets.broadcast,
cluster_info,
entry_receiver,
exit_sender,
&exit_sender,
&blocktree,
);

Expand Down
6 changes: 4 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,9 @@ impl ClusterInfo {
obj: Arc<RwLock<Self>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || {
Expand Down Expand Up @@ -1243,8 +1244,9 @@ impl ClusterInfo {
blocktree: Option<Arc<Blocktree>>,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
Expand Down
13 changes: 4 additions & 9 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,28 @@ use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;

pub struct ClusterInfoVoteListener {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}

impl ClusterInfoVoteListener {
pub fn new(
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
cluster_info: Arc<RwLock<ClusterInfo>>,
sender: PacketSender,
) -> Self {
let exit1 = exit.clone();
let exit = exit.clone();
let thread = Builder::new()
.name("solana-cluster_info_vote_listener".to_string())
.spawn(move || {
let _ = Self::recv_loop(&exit1, &cluster_info, &sender);
let _ = Self::recv_loop(exit, &cluster_info, &sender);
})
.unwrap();
Self {
exit,
thread_hdls: vec![thread],
}
}
fn recv_loop(
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sender: &PacketSender,
) -> Result<()> {
Expand All @@ -52,9 +50,6 @@ impl ClusterInfoVoteListener {
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
}
pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
}
}

impl Service for ClusterInfoVoteListener {
Expand Down
7 changes: 1 addition & 6 deletions core/src/db_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(
Arc::new(read),
exit.clone(),
s_reader,
"window-streamer-test",
);
let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);
Expand Down
17 changes: 6 additions & 11 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,40 @@
use crate::service::Service;
use crate::streamer::{self, PacketReceiver, PacketSender};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::{self, JoinHandle};

pub struct FetchStage {
exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>,
}

impl FetchStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(sockets: Vec<UdpSocket>, exit: Arc<AtomicBool>) -> (Self, PacketReceiver) {
pub fn new(sockets: Vec<UdpSocket>, exit: &Arc<AtomicBool>) -> (Self, PacketReceiver) {
let (sender, receiver) = channel();
(Self::new_with_sender(sockets, exit, &sender), receiver)
}
pub fn new_with_sender(
sockets: Vec<UdpSocket>,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
sender: &PacketSender,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket(tx_sockets, exit, &sender)
}
fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
exit: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
sender: &PacketSender,
) -> Self {
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage"))
.map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage"))
.collect();

Self { exit, thread_hdls }
}

pub fn close(&self) {
self.exit.store(true, Ordering::Relaxed);
Self { thread_hdls }
}
}

Expand Down
Loading