diff --git a/electrs-start-liquid b/electrs-start-liquid index d0573f04..9d022ae5 100755 --- a/electrs-start-liquid +++ b/electrs-start-liquid @@ -20,6 +20,7 @@ do -- \ --network liquid \ --http-socket-file "${HOME}/socket/esplora-liquid-mainnet" \ + --rpc-socket-file "${HOME}/socket/electrum-liquid-mainnet" \ --precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \ --asset-db-path "${HOME}/asset_registry_db" \ --daemon-dir "${HOME}" \ diff --git a/electrs-start-liquidtestnet b/electrs-start-liquidtestnet index beef7c44..bae2555f 100755 --- a/electrs-start-liquidtestnet +++ b/electrs-start-liquidtestnet @@ -20,6 +20,7 @@ do -- \ --network liquidtestnet \ --http-socket-file "${HOME}/socket/esplora-liquid-testnet" \ + --rpc-socket-file "${HOME}/socket/electrum-liquid-testnet" \ --precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \ --asset-db-path "${HOME}/asset_registry_testnet_db" \ --daemon-dir "${HOME}" \ diff --git a/electrs-start-mainnet b/electrs-start-mainnet index 7bb10e59..5d380983 100755 --- a/electrs-start-mainnet +++ b/electrs-start-mainnet @@ -30,6 +30,7 @@ do --bin electrs \ -- \ --http-socket-file "${HOME}/socket/esplora-bitcoin-mainnet" \ + --rpc-socket-file "${HOME}/socket/electrum-bitcoin-mainnet" \ --precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \ --daemon-dir "${HOME}" \ --db-dir "/electrs" \ diff --git a/electrs-start-signet b/electrs-start-signet index b2f695b1..f87ee506 100755 --- a/electrs-start-signet +++ b/electrs-start-signet @@ -31,6 +31,7 @@ do -- \ --network signet \ --http-socket-file "${HOME}/socket/esplora-bitcoin-signet" \ + --rpc-socket-file "${HOME}/socket/electrum-bitcoin-signet" \ --precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \ --daemon-dir "${HOME}" \ --db-dir "/electrs" \ diff --git a/electrs-start-testnet b/electrs-start-testnet index 95ced073..74299dbb 100755 --- a/electrs-start-testnet +++ b/electrs-start-testnet @@ -31,6 +31,7 @@ do -- \ --network testnet \ --http-socket-file "${HOME}/socket/esplora-bitcoin-testnet" \ + --rpc-socket-file "${HOME}/socket/electrum-bitcoin-testnet" \ --precache-scripts "${HOME}/electrs/contrib/popular-scripts.txt" \ --daemon-dir "${HOME}" \ --db-dir "/electrs" \ diff --git a/src/config.rs b/src/config.rs index 3d9eec81..e493c10a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -41,6 +41,7 @@ pub struct Config { pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, pub http_socket_file: Option, + pub rpc_socket_file: Option, pub monitoring_addr: SocketAddr, pub jsonrpc_import: bool, pub light_mode: bool, @@ -243,6 +244,14 @@ impl Config { .takes_value(true), ); + #[cfg(unix)] + let args = args.arg( + Arg::with_name("rpc_socket_file") + .long("rpc-socket-file") + .help("Electrum RPC 'unix socket file' to listen on (default disabled, enabling this ignores the electrum_rpc_addr arg)") + .takes_value(true), + ); + #[cfg(feature = "liquid")] let args = args .arg( @@ -384,6 +393,7 @@ impl Config { ); let http_socket_file: Option = m.value_of("http_socket_file").map(PathBuf::from); + let rpc_socket_file: Option = m.value_of("rpc_socket_file").map(PathBuf::from); let monitoring_addr: SocketAddr = str_to_socketaddr( m.value_of("monitoring_addr") .unwrap_or(&format!("127.0.0.1:{}", default_monitoring_port)), @@ -452,6 +462,7 @@ impl Config { electrum_banner, http_addr, http_socket_file, + rpc_socket_file, monitoring_addr, mempool_backlog_stats_ttl: value_t_or_exit!(m, "mempool_backlog_stats_ttl", u64), mempool_recent_txs_size: value_t_or_exit!(m, "mempool_recent_txs_size", usize), diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 167532d5..1b44ee99 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,7 +1,13 @@ use std::collections::HashMap; use std::convert::TryInto; -use std::io::{BufRead, BufReader, Write}; +use std::fs; +use std::io::{BufRead, BufReader, Read, Write}; +#[cfg(feature = "electrum-discovery")] +use std::net::IpAddr; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; +use std::os::unix::fs::FileTypeExt; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::Path; use std::sync::atomic::AtomicBool; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex}; @@ -97,8 +103,7 @@ struct Connection { query: Arc, last_header_entry: Option, status_hashes: HashMap, // ScriptHash -> StatusHash - stream: TcpStream, - addr: SocketAddr, + stream: ConnectionStream, chan: SyncChannel, stats: Arc, txs_limit: usize, @@ -110,8 +115,7 @@ struct Connection { impl Connection { pub fn new( query: Arc, - stream: TcpStream, - addr: SocketAddr, + stream: ConnectionStream, stats: Arc, txs_limit: usize, die_please: Receiver<()>, @@ -122,7 +126,6 @@ impl Connection { last_header_entry: None, // disable header subscription for now status_hashes: HashMap::new(), stream, - addr, chan: SyncChannel::new(10), stats, txs_limit, @@ -176,6 +179,10 @@ impl Connection { #[cfg(feature = "electrum-discovery")] fn server_add_peer(&self, params: &[Value]) -> Result { + let ip = self + .stream + .ip() + .ok_or(Error::from("Can't add peer with Unix sockets enabled"))?; let discovery = self .discovery .as_ref() @@ -187,7 +194,7 @@ impl Connection { .clone(); let features = serde_json::from_value(features).chain_err(|| "invalid features")?; - discovery.add_server_request(self.addr.ip(), features)?; + discovery.add_server_request(ip, features)?; Ok(json!(true)) } @@ -533,16 +540,22 @@ impl Connection { .chain_err(|| "failed to update subscriptions")?; self.send_values(&values)? } - Message::Done => return Ok(()), + Message::Done => { + self.chan.close(); + return Ok(()); + } } } - recv(shutdown) -> _ => return Ok(()), + recv(shutdown) -> _ => { + self.chan.close(); + return Ok(()); + } } } } fn handle_requests( - mut reader: BufReader, + mut reader: BufReader, tx: crossbeam_channel::Sender, ) -> Result<()> { loop { @@ -577,12 +590,18 @@ impl Connection { let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let tx = self.chan.sender(); - let stream = self.stream.try_clone().expect("failed to clone TcpStream"); let die_please = self.die_please.take().unwrap(); let (reply_killer, reply_receiver) = crossbeam_channel::unbounded(); + + // We create a clone of the stream and put it in an Arc + // This will drop at the end of the function. + let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream")); + // We don't want to keep the stream alive until SIGINT + // It should drop (close) no matter what. + let maybe_stream = Arc::downgrade(&arc_stream); spawn_thread("properly-die", move || { let _ = die_please.recv(); - let _ = stream.shutdown(Shutdown::Both); + let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both)); let _ = reply_killer.send(()); }); @@ -590,7 +609,7 @@ impl Connection { if let Err(e) = self.handle_replies(reply_receiver) { error!( "[{}] connection handling failed: {}", - self.addr, + self.stream.addr_string(), e.display_chain().to_string() ); } @@ -599,10 +618,13 @@ impl Connection { .subscriptions .sub(self.status_hashes.len() as i64); - debug!("[{}] shutting down connection", self.addr); + let addr = self.stream.addr_string(); + debug!("[{}] shutting down connection", addr); + // Drop the Arc so that the stream properly closes. + drop(arc_stream); let _ = self.stream.shutdown(Shutdown::Both); if let Err(err) = child.join().expect("receiver panicked") { - error!("[{}] receiver failed: {}", self.addr, err); + error!("[{}] receiver failed: {}", addr, err); } } } @@ -654,7 +676,7 @@ impl RPC { fn start_notifier( notification: Channel, senders: Arc>>>, - acceptor: Sender>, + acceptor: Sender>, acceptor_shutdown: Sender<()>, ) { spawn_thread("notification", move || { @@ -682,46 +704,23 @@ impl RPC { } fn start_acceptor( - addr: SocketAddr, + config: Arc, shutdown_channel: Channel<()>, - ) -> Channel> { + ) -> Channel> { let chan = Channel::unbounded(); let acceptor = chan.sender(); spawn_thread("acceptor", move || { - let socket = create_socket(&addr); - socket.listen(511).expect("setting backlog failed"); - socket - .set_nonblocking(false) - .expect("cannot set nonblocking to false"); - let listener = TcpListener::from(socket); - let local_addr = listener.local_addr().unwrap(); - let shutdown_bool = Arc::new(AtomicBool::new(false)); - - { - let shutdown_bool = Arc::clone(&shutdown_bool); - crate::util::spawn_thread("shutdown-acceptor", move || { - // Block until shutdown is sent. - let _ = shutdown_channel.receiver().recv(); - // Store the bool so after the next accept it will break the loop - shutdown_bool.store(true, std::sync::atomic::Ordering::Release); - // Connect to the socket to cause it to unblock - let _ = TcpStream::connect(local_addr); - }); - } + let addr = config.electrum_rpc_addr; + let listener = if let Some(path) = config.rpc_socket_file.as_ref() { + // We can leak this Path because we know that this function is only + // called once on startup. + let path: &'static Path = Box::leak(path.clone().into_boxed_path()); - info!("Electrum RPC server running on {}", addr); - loop { - let (stream, addr) = listener.accept().expect("accept failed"); - - if shutdown_bool.load(std::sync::atomic::Ordering::Acquire) { - break; - } - - stream - .set_nonblocking(false) - .expect("failed to set connection as blocking"); - acceptor.send(Some((stream, addr))).expect("send failed"); - } + ConnectionListener::new_unix(path) + } else { + ConnectionListener::new_tcp(&addr) + }; + listener.run(acceptor, shutdown_channel); }); chan } @@ -767,7 +766,6 @@ impl RPC { discovery }); - let rpc_addr = config.electrum_rpc_addr; let txs_limit = config.electrum_txs_limit; RPC { @@ -778,7 +776,7 @@ impl RPC { let acceptor_shutdown = Channel::unbounded(); let acceptor_shutdown_sender = acceptor_shutdown.sender(); - let acceptor = RPC::start_acceptor(rpc_addr, acceptor_shutdown); + let acceptor = RPC::start_acceptor(config, acceptor_shutdown); RPC::start_notifier( notification, senders.clone(), @@ -789,7 +787,8 @@ impl RPC { let mut threads = HashMap::new(); let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded(); - while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { + while let Some(stream) = acceptor.receiver().recv().unwrap() { + let addr = stream.addr_string(); // explicitely scope the shadowed variables for the new thread let query = Arc::clone(&query); let senders = Arc::clone(&senders); @@ -798,16 +797,17 @@ impl RPC { // Kill the peers properly let (killer, peace_receiver) = std::sync::mpsc::channel(); + let killer_clone = killer.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); let spawned = spawn_thread("peer", move || { + let addr = stream.addr_string(); info!("[{}] connected peer", addr); let conn = Connection::new( query, stream, - addr, stats, txs_limit, peace_receiver, @@ -817,6 +817,7 @@ impl RPC { senders.lock().unwrap().push(conn.chan.sender()); conn.run(); info!("[{}] disconnected peer", addr); + let _ = killer_clone.send(()); let _ = garbage_sender.send(std::thread::current().id()); }); @@ -872,3 +873,177 @@ impl Drop for RPC { }); } } + +enum ConnectionListener { + Tcp(TcpListener), + Unix(UnixListener, &'static Path), +} + +impl ConnectionListener { + fn new_tcp(addr: &SocketAddr) -> Self { + let socket = create_socket(addr); + socket.listen(511).expect("setting backlog failed"); + socket + .set_nonblocking(false) + .expect("cannot set nonblocking to false"); + info!("Electrum RPC server running on {}", addr); + Self::Tcp(TcpListener::from(socket)) + } + + /// This takes a static reference to a Path in order to + /// make shallow clones of UnixStreams much cheaper. + /// Since this type will only usually be instanciated 1 time + /// it should be acceptable to just leak the PathBuf. + /// Do not leak values if you call this an unknown number of + /// times throughout the program. + fn new_unix(path: &'static Path) -> Self { + if let Ok(meta) = fs::metadata(path) { + // Cleanup socket file left by previous execution + if meta.file_type().is_socket() { + fs::remove_file(path).ok(); + } + } + + let socket = std::os::unix::net::UnixListener::bind(path) + .expect("cannnot bind to unix socket for RPC"); + socket + .set_nonblocking(false) + .expect("cannot set nonblocking to false"); + info!( + "Electrum RPC server running on unix socket {}", + path.display() + ); + Self::Unix(socket, path) + } + + fn run(&self, acceptor: Sender>, shutdown_channel: Channel<()>) { + let shutdown_bool = Arc::new(AtomicBool::new(false)); + + { + let shutdown_bool = Arc::clone(&shutdown_bool); + crate::util::spawn_thread( + "shutdown-acceptor", + self.create_shutdown_job(shutdown_channel, shutdown_bool), + ); + } + + loop { + let stream = self.accept().expect("accept failed"); + + if shutdown_bool.load(std::sync::atomic::Ordering::Acquire) { + break; + } + + stream + .set_nonblocking(false) + .expect("failed to set connection as blocking"); + acceptor.send(Some(stream)).expect("send failed"); + } + } + + fn accept(&self) -> std::result::Result { + match self { + Self::Tcp(c) => c.accept().map(|(l, r)| ConnectionStream::Tcp(l, r)), + Self::Unix(c, p) => c.accept().map(|(l, r)| ConnectionStream::Unix(l, r, p)), + } + } + + fn create_shutdown_job( + &self, + shutdown_channel: Channel<()>, + shutdown_bool: Arc, + ) -> Box { + match self { + ConnectionListener::Tcp(c) => { + let local_addr = c.local_addr().unwrap(); + Box::new(move || { + // Block until shutdown is sent. + let _ = shutdown_channel.receiver().recv(); + // Store the bool so after the next accept it will break the loop + shutdown_bool.store(true, std::sync::atomic::Ordering::Release); + // Connect to the socket to cause it to unblock + let _ = TcpStream::connect(local_addr); + }) + } + ConnectionListener::Unix(_, p) => { + let path = *p; + Box::new(move || { + // Block until shutdown is sent. + let _ = shutdown_channel.receiver().recv(); + // Store the bool so after the next accept it will break the loop + shutdown_bool.store(true, std::sync::atomic::Ordering::Release); + // Connect to the socket to cause it to unblock + let _ = UnixStream::connect(path); + }) + } + } + } +} + +enum ConnectionStream { + Tcp(TcpStream, std::net::SocketAddr), + Unix(UnixStream, std::os::unix::net::SocketAddr, &'static Path), +} + +impl ConnectionStream { + fn addr_string(&self) -> String { + match self { + ConnectionStream::Tcp(_, a) => format!("{a}"), + ConnectionStream::Unix(_, a, _) => format!("{a:?}"), + } + } + + fn try_clone(&self) -> std::io::Result { + Ok(match self { + ConnectionStream::Tcp(s, a) => ConnectionStream::Tcp(s.try_clone()?, *a), + ConnectionStream::Unix(s, a, p) => ConnectionStream::Unix(s.try_clone()?, a.clone(), p), + }) + } + + fn shutdown(&self, how: Shutdown) -> std::io::Result<()> { + match self { + ConnectionStream::Tcp(s, _) => s.shutdown(how), + ConnectionStream::Unix(s, _, _) => s.shutdown(how), + } + } + + fn set_nonblocking(&self, nonblocking: bool) -> std::io::Result<()> { + match self { + ConnectionStream::Tcp(s, _) => s.set_nonblocking(nonblocking), + ConnectionStream::Unix(s, _, _) => s.set_nonblocking(nonblocking), + } + } + + #[cfg(feature = "electrum-discovery")] + fn ip(&self) -> Option { + match self { + ConnectionStream::Tcp(_, a) => Some(a.ip()), + ConnectionStream::Unix(_, _, _) => None, + } + } +} + +impl Write for ConnectionStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + match self { + ConnectionStream::Tcp(s, _) => s.write(buf), + ConnectionStream::Unix(s, _, _) => s.write(buf), + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match self { + ConnectionStream::Tcp(s, _) => s.flush(), + ConnectionStream::Unix(s, _, _) => s.flush(), + } + } +} + +impl Read for ConnectionStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + ConnectionStream::Tcp(s, _) => s.read(buf), + ConnectionStream::Unix(s, _, _) => s.read(buf), + } + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index b29400d9..22775911 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -38,26 +38,38 @@ pub fn full_hash(hash: &[u8]) -> FullHash { } pub struct SyncChannel { - tx: crossbeam_channel::Sender, - rx: crossbeam_channel::Receiver, + tx: Option>, + rx: Option>, } impl SyncChannel { pub fn new(size: usize) -> SyncChannel { let (tx, rx) = crossbeam_channel::bounded(size); - SyncChannel { tx, rx } + SyncChannel { + tx: Some(tx), + rx: Some(rx), + } } pub fn sender(&self) -> crossbeam_channel::Sender { - self.tx.clone() + self.tx.as_ref().expect("No Sender").clone() } pub fn receiver(&self) -> &crossbeam_channel::Receiver { - &self.rx + self.rx.as_ref().expect("No Receiver") } pub fn into_receiver(self) -> crossbeam_channel::Receiver { - self.rx + self.rx.expect("No Receiver") + } + + /// This drops the sender and receiver, causing all other methods to panic. + /// + /// Use only when you know that the channel will no longer be used. + /// ie. shutdown. + pub fn close(&mut self) -> Option> { + self.tx.take(); + self.rx.take() } }