Skip to content

Commit

Permalink
Threading-related fixes from upstream romanz/electrs
Browse files Browse the repository at this point in the history
Changes were taken from the latest romanz/electrs rpc.rs
implementation prior to the major refactoring in v0.9.0 that
significantly diverged the codebases (https://github.com/romanz/electrs/blob/af6ff09a275ec12b6fd0d6a101637f4710902a3c/src/rpc.rs).

The relevant changes include (not a complete list):
romanz#284
romanz#233
romanz@a3bfdda
romanz#195
romanz#523 (only post-v0.9 change, a very minor one)

This fixes a memory leak that could be reproduced using the following
script, which opens and closes 500k connections with a concurrency of 20:
$ seq 1 500000 | xargs -I {} -n 1 -P 20 sh -c 'echo '\''{"id":{},"method":"server.version","params":[]}'\''| nc 127.0.0.1 50001 -v -N'

Before the fixes, memory usage would continue to grow the more
connections are made, to around 35MB for 500k connections. After the
fixes,  memory usage is steady at around 25MB and doesn't grow with
more connections.
  • Loading branch information
shesek committed Mar 25, 2024
1 parent 72d88dd commit d257ca2
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
Expand All @@ -25,9 +25,7 @@ use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::new_index::{Query, Utxo};
use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof};
use crate::util::{
create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry, SyncChannel,
};
use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry};

const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
Expand Down Expand Up @@ -105,7 +103,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")]
Expand All @@ -118,6 +116,7 @@ impl Connection {
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
Expand All @@ -129,7 +128,7 @@ impl Connection {
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
Expand Down Expand Up @@ -352,7 +351,7 @@ impl Connection {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
let txid = self.query.broadcast_raw(&tx)?;
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
}
Ok(json!(txid))
Expand Down Expand Up @@ -391,9 +390,10 @@ impl Connection {
let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash)
.chain_err(|| "cannot create merkle proof")?;
Ok(json!({
"block_height": blockid.height,
"merkle": merkle,
"pos": pos}))
"block_height": blockid.height,
"merkle": merkle,
"pos": pos
}))
}

fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result<Value> {
Expand All @@ -409,7 +409,8 @@ impl Connection {

Ok(json!({
"tx_hash": txid,
"merkle" : merkle}))
"merkle" : merkle
}))
}

fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
Expand Down Expand Up @@ -523,10 +524,10 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, receiver: Receiver<Message>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
let msg = receiver.recv().chain_err(|| "channel closed")?;
let start_time = Instant::now();
trace!("RPC {:?}", msg);
match msg {
Expand Down Expand Up @@ -587,7 +588,7 @@ impl Connection {
}
}

fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn parse_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
Expand Down Expand Up @@ -615,14 +616,14 @@ impl Connection {
}
}

pub fn run(mut self) {
pub fn run(mut self, receiver: Receiver<Message>) {
self.stats.clients.inc();
conditionally_log_rpc_event!(self, json!({ "event": "connection established" }));

let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
let sender = self.sender.clone();
let child = spawn_thread("reader", || Connection::parse_requests(reader, sender));
if let Err(e) = self.handle_replies(receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand Down Expand Up @@ -698,14 +699,15 @@ impl RPC {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
senders.retain(|sender| {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
false // drop disconnected clients
} else {
true
}
senders.push(sender);
}
})
}
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
}
Expand Down Expand Up @@ -792,29 +794,31 @@ impl RPC {
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();

while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
// explicitely scope the shadowed variables for the new thread
// explicitly scope the shadowed variables for the new thread
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();
let rpc_logging = config.electrum_rpc_logging.clone();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let rpc_logging = config.electrum_rpc_logging.clone();

let (sender, receiver) = mpsc::sync_channel(10);
senders.lock().unwrap().push(sender.clone());

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(
query,
stream,
addr,
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
conn.run(receiver);
info!("[{}] disconnected peer", addr);
let _ = garbage_sender.send(std::thread::current().id());
});
Expand Down

3 comments on commit d257ca2

@sammy007
Copy link

@sammy007 sammy007 commented on d257ca2 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running BS's electrs behind a haproxy with tcp-checks every few seconds, so I have been suffering from electrs getting killed every several hours.:

thread '<unnamed>' panicked at library/std/src/sys/unix/stack_overflow.rs:154:13:

My healthchecks behaviour is similar to your example with nc it just queries version. Every healthcheck there is a new tcp connection.

ps -eo nlwp --no-headers | awk '{ sum += $1 } END{ print sum }'
14233

^^ same as htop thr counter. And it grows.

I've this patch applied seems running better, at least it survived overnight. Seems that patch make is better, but problem still persist as thr grows. It leaks it seems.

I have other nodes (mostly ethereum garbage) on that server tcp-checked and only electrs increasing thr counter there.

@sammy007
Copy link

@sammy007 sammy007 commented on d257ca2 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just hammered it with the netcat oneliner from your descriptoion, no thr increase, but it increases with haproxy's standard tcp-checks:

option tcp-check
tcp-check send '{"jsonrpc":"2.0","method":"server.version","params":[],"id":0}'
tcp-check send \r\n
tcp-check expect string result

@sammy007
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has been further investigating, the healthcheck service is using by default a single RST packet and for some reason electrs never close connection after it which results in increasing number of threads.

Please sign in to comment.