diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 5651c8500f86a8..483210b1624e30 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -81,12 +81,7 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); read_channels.push(r_reader); - read_threads.push(receiver( - Arc::new(read), - exit.clone(), - s_reader, - "bench-streamer", - )); + read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer")); } let t_producer1 = producer(&addr, exit.clone()); diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index e42f2d9104375a..0bbd570c1e3cb1 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -9,6 +9,7 @@ use solana::cluster_info::ClusterInfo; use solana::cluster_info::Node; use solana::packet::to_packets_chunked; use solana::poh_recorder::WorkingBankEntries; +use solana::service::Service; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; @@ -17,6 +18,7 @@ use solana_sdk::signature::{KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; use std::iter; +use std::sync::atomic::Ordering; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -102,7 +104,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); @@ -127,7 +129,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); - poh_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); } #[bench] @@ -208,7 +211,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); @@ -233,5 +236,6 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { start += half_len; start %= verified.len(); }); - poh_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e5e39f657d1202..187ff1b13725cf 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -347,6 +347,7 @@ impl Service for BankingStage { pub fn create_test_recorder( bank: &Arc, ) -> ( + Arc, Arc>, PohService, Receiver, @@ -356,7 +357,7 @@ pub fn create_test_recorder( PohRecorder::new(bank.tick_height(), bank.last_blockhash()); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &PohServiceConfig::default(), &exit); - (poh_recorder, poh_service, entry_receiver) + (exit, poh_recorder, poh_service, entry_receiver) } #[cfg(test)] @@ -378,13 +379,14 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); drop(verified_sender); + exit.store(true, Ordering::Relaxed); banking_stage.join().unwrap(); - poh_service.close().unwrap(); + poh_service.join().unwrap(); } #[test] @@ -395,7 +397,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); poh_recorder.lock().unwrap().set_bank(&bank); @@ -403,7 +405,8 @@ mod tests { trace!("sending bank"); sleep(Duration::from_millis(600)); drop(verified_sender); - poh_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); drop(poh_recorder); trace!("getting entries"); @@ -424,7 +427,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); poh_recorder.lock().unwrap().set_bank(&bank); @@ -452,7 +455,8 @@ mod tests { .unwrap(); drop(verified_sender); - poh_service.close().expect("close"); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); drop(poh_recorder); //receive entries + ticks @@ -481,7 +485,7 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); let cluster_info = ClusterInfo::new(Node::new_localhost().info); let cluster_info = Arc::new(RwLock::new(cluster_info)); poh_recorder.lock().unwrap().set_bank(&bank); @@ -516,7 +520,8 @@ mod tests { .unwrap(); drop(verified_sender); - poh_service.close().expect("close");; + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); drop(poh_recorder); // Collect the ledger and feed it to a new bank. diff --git a/core/src/blob_fetch_stage.rs b/core/src/blob_fetch_stage.rs index a0cebc32ad4944..10a6053472cea2 100644 --- a/core/src/blob_fetch_stage.rs +++ b/core/src/blob_fetch_stage.rs @@ -3,34 +3,29 @@ use crate::service::Service; use crate::streamer::{self, BlobSender}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread::{self, JoinHandle}; pub struct BlobFetchStage { - exit: Arc, thread_hdls: Vec>, } impl BlobFetchStage { - pub fn new(socket: Arc, sender: &BlobSender, exit: Arc) -> Self { + pub fn new(socket: Arc, sender: &BlobSender, exit: &Arc) -> Self { Self::new_multi_socket(vec![socket], sender, exit) } pub fn new_multi_socket( sockets: Vec>, sender: &BlobSender, - exit: Arc, + exit: &Arc, ) -> Self { let thread_hdls: Vec<_> = sockets .into_iter() - .map(|socket| streamer::blob_receiver(socket, exit.clone(), sender.clone())) + .map(|socket| streamer::blob_receiver(socket, &exit, sender.clone())) .collect(); - Self { exit, thread_hdls } - } - - pub fn close(&self) { - self.exit.store(true, Ordering::Relaxed); + Self { thread_hdls } } } diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index f9f9e74b9b68c0..2295b753540667 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -27,9 +27,10 @@ impl BlockstreamService { slot_full_receiver: Receiver<(u64, Pubkey)>, blocktree: Arc, blockstream_socket: String, - exit: Arc, + exit: &Arc, ) -> Self { let mut blockstream = Blockstream::new(blockstream_socket); + let exit = exit.clone(); let t_blockstream = Builder::new() .name("solana-blockstream".to_string()) .spawn(move || loop { diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 50331c88fbf0d8..80f1a4206d784d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -224,14 +224,15 @@ impl BroadcastStage { sock: UdpSocket, cluster_info: Arc>, receiver: Receiver, - exit_sender: Arc, + exit_sender: &Arc, blocktree: &Arc, ) -> Self { let blocktree = blocktree.clone(); + let exit_sender = exit_sender.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - let _exit = Finalizer::new(exit_sender); + let _finalizer = Finalizer::new(exit_sender); Self::run(&sock, &cluster_info, &receiver, &blocktree) }) .unwrap(); @@ -299,7 +300,7 @@ mod test { leader_info.sockets.broadcast, cluster_info, entry_receiver, - exit_sender, + &exit_sender, &blocktree, ); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3c2eb68ce777e4..356c401994ecc7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -868,8 +868,9 @@ impl ClusterInfo { obj: Arc>, bank_forks: Option>>, blob_sender: BlobSender, - exit: Arc, + exit: &Arc, ) -> JoinHandle<()> { + let exit = exit.clone(); Builder::new() .name("solana-gossip".to_string()) .spawn(move || { @@ -1243,8 +1244,9 @@ impl ClusterInfo { blocktree: Option>, requests_receiver: BlobReceiver, response_sender: BlobSender, - exit: Arc, + exit: &Arc, ) -> JoinHandle<()> { + let exit = exit.clone(); Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index bc057be096896c..3f4f255630c8f8 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -10,30 +10,28 @@ use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; pub struct ClusterInfoVoteListener { - exit: Arc, thread_hdls: Vec>, } impl ClusterInfoVoteListener { pub fn new( - exit: Arc, + exit: &Arc, cluster_info: Arc>, sender: PacketSender, ) -> Self { - let exit1 = exit.clone(); + let exit = exit.clone(); let thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { - let _ = Self::recv_loop(&exit1, &cluster_info, &sender); + let _ = Self::recv_loop(exit, &cluster_info, &sender); }) .unwrap(); Self { - exit, thread_hdls: vec![thread], } } fn recv_loop( - exit: &Arc, + exit: Arc, cluster_info: &Arc>, sender: &PacketSender, ) -> Result<()> { @@ -52,9 +50,6 @@ impl ClusterInfoVoteListener { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } - pub fn close(&self) { - self.exit.store(true, Ordering::Relaxed); - } } impl Service for ClusterInfoVoteListener { diff --git a/core/src/db_window.rs b/core/src/db_window.rs index fa55372d35e724..0e67de31b94472 100644 --- a/core/src/db_window.rs +++ b/core/src/db_window.rs @@ -134,12 +134,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver( - Arc::new(read), - exit.clone(), - s_reader, - "window-streamer-test", - ); + let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 51b0fa1341d211..d5b064627d3588 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -3,25 +3,24 @@ use crate::service::Service; use crate::streamer::{self, PacketReceiver, PacketSender}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; pub struct FetchStage { - exit: Arc, thread_hdls: Vec>, } impl FetchStage { #[allow(clippy::new_ret_no_self)] - pub fn new(sockets: Vec, exit: Arc) -> (Self, PacketReceiver) { + pub fn new(sockets: Vec, exit: &Arc) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); (Self::new_with_sender(sockets, exit, &sender), receiver) } pub fn new_with_sender( sockets: Vec, - exit: Arc, + exit: &Arc, sender: &PacketSender, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); @@ -29,19 +28,15 @@ impl FetchStage { } fn new_multi_socket( sockets: Vec>, - exit: Arc, + exit: &Arc, sender: &PacketSender, ) -> Self { let thread_hdls: Vec<_> = sockets .into_iter() - .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage")) + .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")) .collect(); - Self { exit, thread_hdls } - } - - pub fn close(&self) { - self.exit.store(true, Ordering::Relaxed); + Self { thread_hdls } } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index f67a614cd16556..dfc861ab6466de 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -36,28 +36,6 @@ use std::thread::JoinHandle; use std::thread::{spawn, Result}; use std::time::Duration; -struct NodeServices { - tpu: Tpu, - tvu: Tvu, -} - -impl NodeServices { - fn new(tpu: Tpu, tvu: Tvu) -> Self { - NodeServices { tpu, tvu } - } - - fn join(self) -> Result<()> { - self.tpu.join()?; - self.tvu.join()?; - Ok(()) - } - - fn exit(&self) { - self.tpu.exit(); - self.tvu.exit(); - } -} - pub struct FullnodeConfig { pub sigverify_disabled: bool, pub voting_disabled: bool, @@ -92,9 +70,10 @@ pub struct Fullnode { rpc_pubsub_service: Option, rpc_working_bank_handle: JoinHandle<()>, gossip_service: GossipService, - node_services: NodeServices, - poh_service: PohService, poh_recorder: Arc>, + poh_service: PohService, + tpu: Tpu, + tvu: Tvu, } impl Fullnode { @@ -168,7 +147,7 @@ impl Fullnode { drone_addr, storage_state.clone(), config.rpc_config.clone(), - exit.clone(), + &exit, ); let subscriptions = Arc::new(RpcSubscriptions::default()); @@ -274,7 +253,8 @@ impl Fullnode { rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), rpc_working_bank_handle, - node_services: NodeServices::new(tpu, tvu), + tpu, + tvu, exit, poh_service, poh_recorder, @@ -284,6 +264,7 @@ impl Fullnode { // Used for notifying many nodes in parallel to exit pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); + // Need to force the poh_recorder to drop the WorkingBank, // which contains the channel to BroadcastStage. This should be // sufficient as long as no other rotations are happening that @@ -292,11 +273,6 @@ impl Fullnode { // in motion because exit()/close() are only called by the run() loop // which is the sole initiator of rotations. self.poh_recorder.lock().unwrap().clear_bank(); - self.poh_service.exit(); - if let Some(ref rpc_service) = self.rpc_service { - rpc_service.exit(); - } - self.node_services.exit(); } pub fn close(self) -> Result<()> { @@ -343,7 +319,9 @@ impl Service for Fullnode { self.rpc_working_bank_handle.join()?; self.gossip_service.join()?; - self.node_services.join()?; + self.tpu.join()?; + self.tvu.join()?; + Ok(()) } } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 06fac2f53da2d4..5ea1bcf583fd0b 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -16,7 +16,6 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; pub struct GossipService { - exit: Arc, thread_hdls: Vec>, } @@ -35,8 +34,7 @@ impl GossipService { &cluster_info.read().unwrap().my_data().id, gossip_socket.local_addr().unwrap() ); - let t_receiver = - streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender); + let t_receiver = streamer::blob_receiver(gossip_socket.clone(), &exit, request_sender); let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_listen = ClusterInfo::listen( @@ -44,24 +42,11 @@ impl GossipService { blocktree, request_receiver, response_sender.clone(), - exit.clone(), - ); - let t_gossip = ClusterInfo::gossip( - cluster_info.clone(), - bank_forks, - response_sender, - exit.clone(), + exit, ); + let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; - Self { - exit: exit.clone(), - thread_hdls, - } - } - - pub fn close(self) -> thread::Result<()> { - self.exit.store(true, Ordering::Relaxed); - self.join() + Self { thread_hdls } } } @@ -99,8 +84,10 @@ pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec //TODO: deprecate this in favor of discover pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { info!("Wait for convergence with {} nodes", num_nodes); + + let exit = Arc::new(AtomicBool::new(false)); // Let's spy on the network - let (gossip_service, spy_ref, id) = make_spy_node(node); + let (gossip_service, spy_ref, id) = make_spy_node(node, &exit); trace!( "converge spy_node {} looking for at least {} nodes", id, @@ -117,7 +104,8 @@ pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { num_nodes, rpc_peers ); - gossip_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + gossip_service.join().unwrap(); return rpc_peers; } debug!( @@ -132,9 +120,11 @@ pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec { panic!("Failed to converge"); } -pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, Pubkey) { +fn make_spy_node( + leader: &NodeInfo, + exit: &Arc, +) -> (GossipService, Arc>, Pubkey) { let keypair = Keypair::new(); - let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); let id = spy.info.id; let daddr = "0.0.0.0:0".parse().unwrap(); @@ -145,7 +135,7 @@ pub fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc, - poh_exit: Arc, } impl PohService { - pub fn exit(&self) { - self.poh_exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } - pub fn new( poh_recorder: Arc>, config: &PohServiceConfig, @@ -64,10 +54,7 @@ impl PohService { }) .unwrap(); - Self { - tick_producer, - poh_exit: poh_exit.clone(), - } + Self { tick_producer } } fn tick_producer( @@ -157,7 +144,7 @@ mod tests { let poh_service = PohService::new( poh_recorder.clone(), &PohServiceConfig::Tick(HASHES_PER_TICK as usize), - &Arc::new(AtomicBool::new(false)), + &exit, ); poh_recorder.lock().unwrap().set_working_bank(working_bank); @@ -192,7 +179,6 @@ mod tests { } } exit.store(true, Ordering::Relaxed); - poh_service.exit(); let _ = poh_service.join().unwrap(); let _ = entry_producer.join().unwrap(); } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 43988348e1396d..fbf8813ca27dcf 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -45,7 +45,7 @@ pub struct RepairService { impl RepairService { fn run( blocktree: &Arc, - exit: &Arc, + exit: Arc, repair_socket: &Arc, cluster_info: &Arc>, ) { @@ -112,13 +112,14 @@ impl RepairService { pub fn new( blocktree: Arc, - exit: Arc, + exit: &Arc, repair_socket: Arc, cluster_info: Arc>, ) -> Self { + let exit = exit.clone(); let t_repair = Builder::new() .name("solana-repair-service".to_string()) - .spawn(move || Self::run(&blocktree, &exit, &repair_socket, &cluster_info)) + .spawn(move || Self::run(&blocktree, exit, &repair_socket, &cluster_info)) .unwrap(); RepairService { t_repair } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 777161d01a2437..a0a610d849ccf0 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -48,7 +48,6 @@ impl Drop for Finalizer { pub struct ReplayStage { t_replay: JoinHandle>, - exit: Arc, } impl ReplayStage { @@ -59,7 +58,7 @@ impl ReplayStage { blocktree: Arc, bank_forks: &Arc>, cluster_info: Arc>, - exit: Arc, + exit: &Arc, ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, @@ -199,7 +198,7 @@ impl ReplayStage { }) .unwrap(); ( - Self { t_replay, exit }, + Self { t_replay }, slot_full_receiver, forward_entry_receiver, ) @@ -259,15 +258,6 @@ impl ReplayStage { result } - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - pub fn verify_and_process_entries( bank: &Bank, entries: &[Entry], @@ -345,7 +335,6 @@ mod test { use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; - use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -368,22 +357,21 @@ mod test { let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replay stage - let exit = Arc::new(AtomicBool::new(false)); - let voting_keypair = Arc::new(Keypair::new()); { + let voting_keypair = Arc::new(Keypair::new()); let (bank_forks, _bank_forks_info, blocktree, l_receiver) = new_banks_from_blocktree(&my_ledger_path, None); let bank = bank_forks.working_bank(); let blocktree = Arc::new(blocktree); - let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( my_keypair.pubkey(), Some(voting_keypair.clone()), blocktree.clone(), &Arc::new(RwLock::new(bank_forks)), cluster_info_me.clone(), - exit.clone(), + &exit, l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, @@ -403,10 +391,9 @@ mod test { assert_eq!(next_tick[0], received_tick[0]); - replay_stage - .close() - .expect("Expect successful ReplayStage exit"); - poh_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + replay_stage.join().unwrap(); + poh_service.join().unwrap(); } let _ignored = remove_dir_all(&my_ledger_path); } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 340c4d5b93861e..c84614a1234427 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -171,8 +171,7 @@ impl Replicator { node.sockets.tvu.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); let (blob_fetch_sender, blob_fetch_receiver) = channel(); - let fetch_stage = - BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone()); + let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); @@ -183,7 +182,7 @@ impl Replicator { blob_fetch_receiver, retransmit_sender, repair_socket, - exit.clone(), + &exit, ); info!("window created, waiting for ledger download done"); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 309b137aaae710..4832daffa33062 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -113,7 +113,7 @@ impl RetransmitStage { retransmit_socket: Arc, repair_socket: Arc, fetch_stage_receiver: BlobReceiver, - exit: Arc, + exit: &Arc, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 35147db44f7605..bec2580dfa64cd 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -57,13 +57,13 @@ impl JsonRpcRequestProcessor { pub fn new( storage_state: StorageState, config: JsonRpcConfig, - fullnode_exit: Arc, + fullnode_exit: &Arc, ) -> Self { JsonRpcRequestProcessor { bank: None, storage_state, config, - fullnode_exit, + fullnode_exit: fullnode_exit.clone(), } } @@ -428,7 +428,7 @@ mod tests { let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), - exit, + &exit, ))); request_processor.write().unwrap().set_bank(&bank); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); @@ -458,7 +458,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let exit = Arc::new(AtomicBool::new(false)); let mut request_processor = - JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), exit); + JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); request_processor.set_bank(&bank); thread::spawn(move || { let blockhash = bank.last_blockhash(); @@ -631,7 +631,7 @@ mod tests { let mut request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::default(), - exit, + &exit, ); request_processor.set_bank(&bank); Arc::new(RwLock::new(request_processor)) @@ -707,11 +707,8 @@ mod tests { #[test] fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() { let exit = Arc::new(AtomicBool::new(false)); - let request_processor = JsonRpcRequestProcessor::new( - StorageState::default(), - JsonRpcConfig::default(), - exit.clone(), - ); + let request_processor = + JsonRpcRequestProcessor::new(StorageState::default(), JsonRpcConfig::default(), &exit); assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); } @@ -721,7 +718,7 @@ mod tests { let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::DefaultConfig, - exit.clone(), + &exit, ); assert_eq!(request_processor.fullnode_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); @@ -733,7 +730,7 @@ mod tests { let request_processor = JsonRpcRequestProcessor::new( StorageState::default(), JsonRpcConfig::TestOnlyAllowRpcFullnodeExit, - exit.clone(), + &exit, ); assert_eq!(request_processor.fullnode_exit(), Ok(true)); assert_eq!(exit.load(Ordering::Relaxed), true); diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 3d4e4db0257651..1f3d34eda45f60 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -17,7 +17,6 @@ pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, - exit: Arc, pub request_processor: Arc>, // Used only by tests... } @@ -28,13 +27,13 @@ impl JsonRpcService { drone_addr: SocketAddr, storage_state: StorageState, config: JsonRpcConfig, - exit: Arc, + exit: &Arc, ) -> Self { info!("rpc bound to {:?}", rpc_addr); let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( storage_state, config, - exit.clone(), + exit, ))); let request_processor_ = request_processor.clone(); @@ -71,7 +70,6 @@ impl JsonRpcService { .unwrap(); Self { thread_hdl, - exit, request_processor, } } @@ -79,15 +77,6 @@ impl JsonRpcService { pub fn set_bank(&mut self, bank: &Arc) { self.request_processor.write().unwrap().set_bank(bank); } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } impl Service for JsonRpcService { @@ -127,7 +116,7 @@ mod tests { drone_addr, StorageState::default(), JsonRpcConfig::default(), - exit, + &exit, ); rpc_service.set_bank(&Arc::new(bank)); let thread = rpc_service.thread_hdl.thread(); @@ -142,7 +131,7 @@ mod tests { .get_balance(alice.pubkey()) .unwrap() ); - - rpc_service.close().unwrap(); + exit.store(true, Ordering::Relaxed); + rpc_service.join().unwrap(); } } diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 9d3ce3c82c4a71..95de7a8805a755 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -19,7 +19,7 @@ pub type BlobReceiver = Receiver; fn recv_loop( sock: &UdpSocket, - exit: &Arc, + exit: Arc, channel: &PacketSender, channel_tag: &'static str, ) -> Result<()> { @@ -47,7 +47,7 @@ fn recv_loop( pub fn receiver( sock: Arc, - exit: Arc, + exit: &Arc, packet_sender: PacketSender, sender_tag: &'static str, ) -> JoinHandle<()> { @@ -55,10 +55,11 @@ pub fn receiver( if res.is_err() { panic!("streamer::receiver set_read_timeout error"); } + let exit = exit.clone(); Builder::new() .name("solana-receiver".to_string()) .spawn(move || { - let _ = recv_loop(&sock, &exit, &packet_sender, sender_tag); + let _ = recv_loop(&sock, exit, &packet_sender, sender_tag); }) .unwrap() } @@ -116,12 +117,17 @@ fn recv_blobs(sock: &UdpSocket, s: &BlobSender) -> Result<()> { Ok(()) } -pub fn blob_receiver(sock: Arc, exit: Arc, s: BlobSender) -> JoinHandle<()> { +pub fn blob_receiver( + sock: Arc, + exit: &Arc, + s: BlobSender, +) -> JoinHandle<()> { //DOCUMENTED SIDE-EFFECT //1 second timeout on socket read let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer)) .expect("set socket timeout"); + let exit = exit.clone(); Builder::new() .name("solana-blob_receiver".to_string()) .spawn(move || loop { @@ -173,7 +179,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test"); + let t_receiver = receiver(Arc::new(read), &exit, s_reader, "streamer-test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/core/src/thin_client.rs b/core/src/thin_client.rs index 5b3ae5b2fff69a..672e118b920225 100644 --- a/core/src/thin_client.rs +++ b/core/src/thin_client.rs @@ -9,6 +9,7 @@ use crate::gossip_service::GossipService; use crate::packet::PACKET_DATA_SIZE; use crate::result::{Error, Result}; use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; +use crate::service::Service; use bincode::serialize_into; use bs58; use serde_json; @@ -24,7 +25,7 @@ use solana_sdk::transaction::Transaction; use std; use std::io; use std::net::{SocketAddr, UdpSocket}; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; @@ -432,7 +433,8 @@ pub fn poll_gossip_for_leader(leader_gossip: SocketAddr, timeout: Option) - sleep(Duration::from_millis(100)); } - gossip_service.close()?; + exit.store(true, Ordering::Relaxed); + gossip_service.join()?; if log_enabled!(log::Level::Trace) { trace!("{}", cluster_info.read().unwrap().node_info_trace()); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 3405695d947cdf..3eec8787060a5e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -12,12 +12,13 @@ use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; -pub struct LeaderServices { +pub struct Tpu { + pub id: Pubkey, fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, @@ -25,53 +26,6 @@ pub struct LeaderServices { broadcast_stage: BroadcastStage, } -impl LeaderServices { - fn new( - fetch_stage: FetchStage, - sigverify_stage: SigVerifyStage, - banking_stage: BankingStage, - cluster_info_vote_listener: ClusterInfoVoteListener, - broadcast_stage: BroadcastStage, - ) -> Self { - LeaderServices { - fetch_stage, - sigverify_stage, - banking_stage, - cluster_info_vote_listener, - broadcast_stage, - } - } - - pub fn exit(&self) { - self.fetch_stage.close(); - } - - fn join(self) -> thread::Result<()> { - let mut results = vec![]; - results.push(self.fetch_stage.join()); - results.push(self.sigverify_stage.join()); - results.push(self.cluster_info_vote_listener.join()); - results.push(self.banking_stage.join()); - let broadcast_result = self.broadcast_stage.join(); - for result in results { - result?; - } - let _ = broadcast_result?; - Ok(()) - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } -} - -pub struct Tpu { - leader_services: LeaderServices, - exit: Arc, - pub id: Pubkey, -} - impl Tpu { pub fn new( id: Pubkey, @@ -88,9 +42,9 @@ impl Tpu { let (packet_sender, packet_receiver) = channel(); let fetch_stage = - FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone()); + FetchStage::new_with_sender(transactions_sockets, &exit, &packet_sender.clone()); let cluster_info_vote_listener = - ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender); + ClusterInfoVoteListener::new(&exit, cluster_info.clone(), packet_sender); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -101,42 +55,35 @@ impl Tpu { broadcast_socket, cluster_info.clone(), entry_receiver, - exit.clone(), + &exit, blocktree, ); - let leader_services = LeaderServices::new( + Self { + id, fetch_stage, sigverify_stage, banking_stage, cluster_info_vote_listener, broadcast_stage, - ); - Self { - leader_services, - exit: exit.clone(), - id, } } - - pub fn exit(&self) { - self.exit.store(true, Ordering::Relaxed); - } - - pub fn is_exited(&self) -> bool { - self.exit.load(Ordering::Relaxed) - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } impl Service for Tpu { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.leader_services.join() + let mut results = vec![]; + results.push(self.fetch_stage.join()); + results.push(self.sigverify_stage.join()); + results.push(self.cluster_info_vote_listener.join()); + results.push(self.banking_stage.join()); + let broadcast_result = self.broadcast_stage.join(); + for result in results { + result?; + } + let _ = broadcast_result?; + Ok(()) } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 214e427dafe99a..244305ab1fa825 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -26,7 +26,7 @@ use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; @@ -37,7 +37,6 @@ pub struct Tvu { replay_stage: ReplayStage, blockstream_service: Option, storage_stage: StorageStage, - exit: Arc, } pub struct Sockets { @@ -90,8 +89,7 @@ impl Tvu { let mut blob_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); - let fetch_stage = - BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, exit.clone()); + let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified @@ -103,7 +101,7 @@ impl Tvu { Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, - exit.clone(), + &exit, ); let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( @@ -112,7 +110,7 @@ impl Tvu { blocktree.clone(), &bank_forks, cluster_info.clone(), - exit.clone(), + &exit, ledger_signal_receiver, subscriptions, poh_recorder, @@ -123,7 +121,7 @@ impl Tvu { slot_full_receiver, blocktree.clone(), blockstream.unwrap().to_string(), - exit.clone(), + &exit, ); Some(blockstream_service) } else { @@ -135,7 +133,7 @@ impl Tvu { forward_entry_receiver, Some(blocktree), &keypair, - &exit.clone(), + &exit, bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still storage_rotate_count, &cluster_info, @@ -147,25 +145,8 @@ impl Tvu { replay_stage, blockstream_service, storage_stage, - exit: exit.clone(), } } - - pub fn is_exited(&self) -> bool { - self.exit.load(Ordering::Relaxed) - } - - pub fn exit(&self) { - // Call exit to make sure replay stage is unblocked from a channel it may be blocked on. - // Then replay stage will set the self.exit variable and cause the rest of the - // pipeline to exit - self.replay_stage.exit(); - } - - pub fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } } impl Service for Tvu { @@ -192,6 +173,7 @@ pub mod tests { use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; + use std::sync::atomic::Ordering; #[test] fn test_tvu_exit() { @@ -219,8 +201,7 @@ pub mod tests { let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); let bank = bank_forks.working_bank(); - let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); - let exit = Arc::new(AtomicBool::new(false)); + let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let tvu = Tvu::new( Some(Arc::new(Keypair::new())), &Arc::new(RwLock::new(bank_forks)), @@ -242,7 +223,8 @@ pub mod tests { &poh_recorder, &exit, ); - tvu.close().expect("close"); - poh_service.close().expect("close"); + exit.store(true, Ordering::Relaxed); + tvu.join().unwrap(); + poh_service.join().unwrap(); } } diff --git a/core/src/window.rs b/core/src/window.rs index 671e6249adced3..aae637847c5c4b 100644 --- a/core/src/window.rs +++ b/core/src/window.rs @@ -214,12 +214,7 @@ mod test { let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); - let t_receiver = receiver( - Arc::new(read), - exit.clone(), - s_reader, - "window-streamer-test", - ); + let t_receiver = receiver(Arc::new(read), &exit, s_reader, "window-streamer-test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index e38825ec186c7b..742f8d3d17fa61 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -102,19 +102,15 @@ impl WindowService { r: BlobReceiver, retransmit: BlobSender, repair_socket: Arc, - exit: Arc, + exit: &Arc, ) -> WindowService { - let exit_ = exit.clone(); - let repair_service = RepairService::new( - blocktree.clone(), - exit.clone(), - repair_socket, - cluster_info.clone(), - ); + let repair_service = + RepairService::new(blocktree.clone(), exit, repair_socket, cluster_info.clone()); + let exit = exit.clone(); let t_window = Builder::new() .name("solana-window".to_string()) .spawn(move || { - let _exit = Finalizer::new(exit_); + let _exit = Finalizer::new(exit.clone()); let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); loop { @@ -182,8 +178,7 @@ mod test { let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); - let t_receiver = - blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); + let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader); let (s_retransmit, r_retransmit) = channel(); let blocktree_path = get_tmp_ledger_path!(); let blocktree = Arc::new( @@ -195,7 +190,7 @@ mod test { r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), - exit.clone(), + &exit, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -254,8 +249,7 @@ mod test { let subs = Arc::new(RwLock::new(cluster_info_me)); let (s_reader, r_reader) = channel(); - let t_receiver = - blob_receiver(Arc::new(leader_node.sockets.gossip), exit.clone(), s_reader); + let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader); let (s_retransmit, r_retransmit) = channel(); let blocktree_path = get_tmp_ledger_path!(); let blocktree = Arc::new( @@ -267,7 +261,7 @@ mod test { r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), - exit.clone(), + &exit, ); let t_responder = { let (s_responder, r_responder) = channel(); diff --git a/tests/replicator.rs b/tests/replicator.rs index e01f59c2a446f7..d4b49de21da5c5 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -156,7 +156,7 @@ fn test_replicator_startup_basic() { let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = channel(); let repair_socket = Arc::new(tn.sockets.repair); - let t_receiver = blob_receiver(repair_socket.clone(), exit.clone(), s_reader); + let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader); info!( "Sending repair requests from: {} to: {}", diff --git a/tests/tvu.rs b/tests/tvu.rs index 25600533307add..450777245daa07 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -67,7 +67,7 @@ fn test_replay() { let (s_reader, r_reader) = channel(); let blob_sockets: Vec> = target2.sockets.tvu.into_iter().map(Arc::new).collect(); - let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader); + let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), &exit, s_reader); // simulate leader sending messages let (s_responder, r_responder) = channel(); @@ -111,7 +111,8 @@ fn test_replay() { .expect("Expected to successfully open ledger"); let vote_account_keypair = Arc::new(Keypair::new()); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); - let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank); let tvu = Tvu::new( Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), @@ -184,14 +185,15 @@ fn test_replay() { let bob_balance = bank.get_balance(&bob_keypair.pubkey()); assert_eq!(bob_balance, starting_balance - alice_ref_balance); - poh_service.close().expect("close"); - tvu.close().expect("close"); exit.store(true, Ordering::Relaxed); - dr_l.join().expect("join"); - dr_2.join().expect("join"); - dr_1.join().expect("join"); - t_receiver.join().expect("join"); - t_responder.join().expect("join"); + poh_service_exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + tvu.join().unwrap(); + dr_l.join().unwrap(); + dr_2.join().unwrap(); + dr_1.join().unwrap(); + t_receiver.join().unwrap(); + t_responder.join().unwrap(); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&blocktree_path); }