Skip to content

Commit

Permalink
Don't deadlock when shutting down
Browse files Browse the repository at this point in the history
  • Loading branch information
romanz committed Apr 12, 2020
1 parent 7d23da5 commit a3bfdda
Showing 1 changed file with 27 additions and 47 deletions.
74 changes: 27 additions & 47 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use serde_json::{from_str, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
Expand Down Expand Up @@ -488,21 +489,24 @@ struct Stats {
impl RPC {
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<HashMap<i32, SyncSender<Message>>>>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let senders = senders.lock().unwrap();
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for (i, sender) in senders.iter() {
if let Err(e) = sender.try_send(Message::PeriodicUpdate) {
debug!("failed to send PeriodicUpdate to peer {}: {}", i, e);
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
Notification::Exit => acceptor.send(None).unwrap(),
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
}
}
});
Expand Down Expand Up @@ -541,60 +545,36 @@ impl RPC {
)),
});
let notification = Channel::unbounded();

RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(HashMap::<i32, SyncSender<Message>>::new()));
let handles = Arc::new(Mutex::new(
HashMap::<i32, std::thread::JoinHandle<()>>::new(),
));
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));

let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());

let mut handle_count = 0;
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let handle_id = handle_count;
handle_count += 1;

// explicitely scope the shadowed variables for the new thread
let handle: thread::JoinHandle<()> = {
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let handles = Arc::clone(&handles);

spawn_thread("peer", move || {
info!("[{}] connected peer #{}", addr, handle_id);
let conn = Connection::new(query, stream, addr, stats, relayfee);
senders
.lock()
.unwrap()
.insert(handle_id, conn.chan.sender());
conn.run();
info!("[{}] disconnected peer #{}", addr, handle_id);

senders.lock().unwrap().remove(&handle_id);
handles.lock().unwrap().remove(&handle_id);
})
};

handles.lock().unwrap().insert(handle_id, handle);
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);

// HACK: detach peer-handling threads
spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats, relayfee);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
});
}
trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().values() {
for sender in senders.lock().unwrap().iter() {
let _ = sender.send(Message::Done);
}
thread::sleep(Duration::from_secs(1)); // TODO: actually wait for threads

trace!(
"waiting for {} RPC handling threads",
handles.lock().unwrap().len()
);
for (_, handle) in handles.lock().unwrap().drain() {
if let Err(e) = handle.join() {
warn!("failed to join thread: {:?}", e);
}
}
trace!("RPC connections are closed");
})),
}
Expand Down

0 comments on commit a3bfdda

Please sign in to comment.