From a3bfdda32a1c8cab248d3525786a433d1e415d1e Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 11 Apr 2020 18:02:16 +0300 Subject: [PATCH] Don't deadlock when shutting down --- src/rpc.rs | 74 ++++++++++++++++++++---------------------------------- 1 file changed, 27 insertions(+), 47 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 211c04e80..89423c582 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -8,9 +8,10 @@ use serde_json::{from_str, Value}; use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{Sender, SyncSender}; +use std::sync::mpsc::{Sender, SyncSender, TrySendError}; use std::sync::{Arc, Mutex}; use std::thread; +use std::time::Duration; use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; @@ -488,21 +489,24 @@ struct Stats { impl RPC { fn start_notifier( notification: Channel, - senders: Arc>>>, + senders: Arc>>>, acceptor: Sender>, ) { spawn_thread("notification", move || { for msg in notification.receiver().iter() { - let senders = senders.lock().unwrap(); + let mut senders = senders.lock().unwrap(); match msg { Notification::Periodic => { - for (i, sender) in senders.iter() { - if let Err(e) = sender.try_send(Message::PeriodicUpdate) { - debug!("failed to send PeriodicUpdate to peer {}: {}", i, e); + for sender in senders.split_off(0) { + if let Err(TrySendError::Disconnected(_)) = + sender.try_send(Message::PeriodicUpdate) + { + continue; } + senders.push(sender); } } - Notification::Exit => acceptor.send(None).unwrap(), + Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done } } }); @@ -541,60 +545,36 @@ impl RPC { )), }); let notification = Channel::unbounded(); + RPC { notification: notification.sender(), server: Some(spawn_thread("rpc", move || { - let senders = Arc::new(Mutex::new(HashMap::>::new())); - let handles = Arc::new(Mutex::new( - HashMap::>::new(), - )); + let senders = Arc::new(Mutex::new(Vec::>::new())); let acceptor = RPC::start_acceptor(addr); RPC::start_notifier(notification, senders.clone(), acceptor.sender()); - let mut handle_count = 0; while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { - let handle_id = handle_count; - handle_count += 1; - // explicitely scope the shadowed variables for the new thread - let handle: thread::JoinHandle<()> = { - let query = Arc::clone(&query); - let senders = Arc::clone(&senders); - let stats = Arc::clone(&stats); - let handles = Arc::clone(&handles); - - spawn_thread("peer", move || { - info!("[{}] connected peer #{}", addr, handle_id); - let conn = Connection::new(query, stream, addr, stats, relayfee); - senders - .lock() - .unwrap() - .insert(handle_id, conn.chan.sender()); - conn.run(); - info!("[{}] disconnected peer #{}", addr, handle_id); - - senders.lock().unwrap().remove(&handle_id); - handles.lock().unwrap().remove(&handle_id); - }) - }; - - handles.lock().unwrap().insert(handle_id, handle); + let query = Arc::clone(&query); + let senders = Arc::clone(&senders); + let stats = Arc::clone(&stats); + + // HACK: detach peer-handling threads + spawn_thread("peer", move || { + info!("[{}] connected peer", addr); + let conn = Connection::new(query, stream, addr, stats, relayfee); + senders.lock().unwrap().push(conn.chan.sender()); + conn.run(); + info!("[{}] disconnected peer", addr); + }); } trace!("closing {} RPC connections", senders.lock().unwrap().len()); - for sender in senders.lock().unwrap().values() { + for sender in senders.lock().unwrap().iter() { let _ = sender.send(Message::Done); } + thread::sleep(Duration::from_secs(1)); // TODO: actually wait for threads - trace!( - "waiting for {} RPC handling threads", - handles.lock().unwrap().len() - ); - for (_, handle) in handles.lock().unwrap().drain() { - if let Err(e) = handle.join() { - warn!("failed to join thread: {:?}", e); - } - } trace!("RPC connections are closed"); })), }