Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Electrum RPC Hang #48

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ fn run_server(config: Arc<Config>) -> Result<()> {
loop {
if let Err(err) = signal.wait(Duration::from_millis(500), true) {
info!("stopping server: {}", err);

electrs::util::spawn_thread("shutdown-thread-checker", || {
let mut counter = 40;
let interval_ms = 500;

while counter > 0 {
electrs::util::with_spawned_threads(|threads| {
debug!("Threads during shutdown: {:?}", threads);
});
std::thread::sleep(std::time::Duration::from_millis(interval_ms));
counter -= 1;
}
});

rest_server.stop();
// the electrum server is stopped when dropped
break;
Expand Down Expand Up @@ -133,4 +147,7 @@ fn main() {
error!("server failed: {}", e.display_chain());
process::exit(1);
}
electrs::util::with_spawned_threads(|threads| {
debug!("Threads before closing: {:?}", threads);
});
}
143 changes: 108 additions & 35 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down Expand Up @@ -101,6 +102,7 @@ struct Connection {
chan: SyncChannel<Message>,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Option<Receiver<()>>,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
}
Expand All @@ -112,6 +114,7 @@ impl Connection {
addr: SocketAddr,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Receiver<()>,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
Connection {
Expand All @@ -123,6 +126,7 @@ impl Connection {
chan: SyncChannel::new(10),
stats,
txs_limit,
die_please: Some(die_please),
#[cfg(feature = "electrum-discovery")]
discovery,
}
Expand Down Expand Up @@ -501,38 +505,46 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
cmd.get("method"),
cmd.get("params").unwrap_or(&empty_params),
cmd.get("id"),
) {
(Some(Value::String(method)), Value::Array(params), Some(id)) => {
self.handle_command(method, params, id)?
crossbeam_channel::select! {
recv(self.chan.receiver()) -> msg => {
let msg = msg.chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
cmd.get("method"),
cmd.get("params").unwrap_or(&empty_params),
cmd.get("id"),
) {
(Some(Value::String(method)), Value::Array(params), Some(id)) => {
self.handle_command(method, params, id)?
}
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
}
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
Message::Done => return Ok(()),
}
}
Message::Done => return Ok(()),
recv(shutdown) -> _ => return Ok(()),
}
}
}

fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn handle_requests(
mut reader: BufReader<TcpStream>,
tx: crossbeam_channel::Sender<Message>,
) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
Expand Down Expand Up @@ -564,8 +576,18 @@ impl Connection {
self.stats.clients.inc();
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();

let stream = self.stream.try_clone().expect("failed to clone TcpStream");
let die_please = self.die_please.take().unwrap();
let (reply_killer, reply_receiver) = crossbeam_channel::unbounded();
spawn_thread("properly-die", move || {
let _ = die_please.recv();
let _ = stream.shutdown(Shutdown::Both);
let _ = reply_killer.send(());
});

let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
if let Err(e) = self.handle_replies(reply_receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand Down Expand Up @@ -631,30 +653,38 @@ struct Stats {
impl RPC {
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
senders: Arc<Mutex<Vec<crossbeam_channel::Sender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
acceptor_shutdown: Sender<()>,
) {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
if let Err(crossbeam_channel::TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
Notification::Exit => {
acceptor_shutdown.send(()).unwrap(); // Stop the acceptor itself
acceptor.send(None).unwrap(); // mark acceptor as done
break;
}
}
}
});
}

fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
fn start_acceptor(
addr: SocketAddr,
shutdown_channel: Channel<()>,
) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::unbounded();
let acceptor = chan.sender();
spawn_thread("acceptor", move || {
Expand All @@ -664,10 +694,29 @@ impl RPC {
.set_nonblocking(false)
.expect("cannot set nonblocking to false");
let listener = TcpListener::from(socket);
let local_addr = listener.local_addr().unwrap();
let shutdown_bool = Arc::new(AtomicBool::new(false));

{
let shutdown_bool = Arc::clone(&shutdown_bool);
crate::util::spawn_thread("shutdown-acceptor", move || {
// Block until shutdown is sent.
let _ = shutdown_channel.receiver().recv();
// Store the bool so after the next accept it will break the loop
shutdown_bool.store(true, std::sync::atomic::Ordering::Release);
// Connect to the socket to cause it to unblock
let _ = TcpStream::connect(local_addr);
});
}

info!("Electrum RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().expect("accept failed");

if shutdown_bool.load(std::sync::atomic::Ordering::Acquire) {
break;
}

stream
.set_nonblocking(false)
.expect("failed to set connection as blocking");
Expand Down Expand Up @@ -724,10 +773,19 @@ impl RPC {
RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));

let acceptor = RPC::start_acceptor(rpc_addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let senders =
Arc::new(Mutex::new(Vec::<crossbeam_channel::Sender<Message>>::new()));
let killers = Arc::new(Mutex::new(Vec::<Sender<()>>::new()));

let acceptor_shutdown = Channel::unbounded();
let acceptor_shutdown_sender = acceptor_shutdown.sender();
let acceptor = RPC::start_acceptor(rpc_addr, acceptor_shutdown);
RPC::start_notifier(
notification,
senders.clone(),
acceptor.sender(),
acceptor_shutdown_sender,
);

let mut threads = HashMap::new();
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
Expand All @@ -738,6 +796,11 @@ impl RPC {
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();

// Kill the peers properly
let (killer, peace_receiver) = std::sync::mpsc::channel();
killers.lock().unwrap().push(killer);

#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();

Expand All @@ -749,6 +812,7 @@ impl RPC {
addr,
stats,
txs_limit,
peace_receiver,
#[cfg(feature = "electrum-discovery")]
discovery,
);
Expand All @@ -769,10 +833,16 @@ impl RPC {
}
}
}
// Drop these
drop(acceptor);
drop(garbage_receiver);

trace!("closing {} RPC connections", senders.lock().unwrap().len());
for killer in killers.lock().unwrap().iter() {
let _ = killer.send(());
}
for sender in senders.lock().unwrap().iter() {
let _ = sender.send(Message::Done);
let _ = sender.try_send(Message::Done);
}

for (id, thread) in threads {
Expand Down Expand Up @@ -800,5 +870,8 @@ impl Drop for RPC {
handle.join().unwrap();
}
trace!("RPC server is stopped");
crate::util::with_spawned_threads(|threads| {
trace!("Threads after dropping RPC: {:?}", threads);
});
}
}
2 changes: 1 addition & 1 deletion src/elements/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl AssetRegistry {
}

pub fn spawn_sync(asset_db: Arc<RwLock<AssetRegistry>>) -> thread::JoinHandle<()> {
thread::spawn(move || loop {
crate::util::spawn_thread("asset-registry", move || loop {
if let Err(e) = asset_db.write().unwrap().fs_sync() {
error!("registry fs_sync failed: {:?}", e);
}
Expand Down
5 changes: 2 additions & 3 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::collections::HashMap;
use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::thread;

use crate::chain::{Block, BlockHash};
Expand Down Expand Up @@ -44,12 +43,12 @@ pub struct BlockEntry {
type SizedBlock = (Block, u32);

pub struct Fetcher<T> {
receiver: Receiver<T>,
receiver: crossbeam_channel::Receiver<T>,
thread: thread::JoinHandle<()>,
}

impl<T> Fetcher<T> {
fn from(receiver: Receiver<T>, thread: thread::JoinHandle<()>) -> Self {
fn from(receiver: crossbeam_channel::Receiver<T>, thread: thread::JoinHandle<()>) -> Self {
Fetcher { receiver, thread }
}

Expand Down
2 changes: 1 addition & 1 deletion src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ pub fn start(config: Arc<Config>, query: Arc<Query>) -> Handle {

Handle {
tx,
thread: thread::spawn(move || {
thread: crate::util::spawn_thread("rest-server", move || {
run_server(config, query, rx);
}),
}
Expand Down
3 changes: 1 addition & 2 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crossbeam_channel as channel;
use crossbeam_channel::RecvTimeoutError;
use std::thread;
use std::time::{Duration, Instant};

use signal_hook::consts::{SIGINT, SIGTERM, SIGUSR1};
Expand All @@ -16,7 +15,7 @@ fn notify(signals: &[i32]) -> channel::Receiver<i32> {
let (s, r) = channel::bounded(1);
let mut signals =
signal_hook::iterator::Signals::new(signals).expect("failed to register signal hook");
thread::spawn(move || {
crate::util::spawn_thread("signal-notifier", move || {
for signal in signals.forever() {
s.send(signal)
.unwrap_or_else(|_| panic!("failed to send signal {}", signal));
Expand Down
Loading