Skip to content

Commit

Permalink
Threading-related fixes from upstream romanz/electrs
Browse files Browse the repository at this point in the history
Changes were taken from the latest romanz/electrs rpc.rs
implementation prior to the major refactoring in v0.9.0 that
significantly diverged the codebases (https://github.com/romanz/electrs/blob/af6ff09a275ec12b6fd0d6a101637f4710902a3c/src/rpc.rs).

The relevant changes include (not a complete list):
romanz#284
romanz#233
romanz@a3bfdda
romanz#195
romanz#523 (only post-v0.9 change, a very minor one)

This fixes a memory leak that could be reproduced using the following
script, which opens and closes 500k connections with a concurrency of 20:
$ seq 1 500000 | xargs -I {} -n 1 -P 20 sh -c 'echo '\''{"id":{},"method":"server.version","params":[]}'\''| nc 127.0.0.1 50001 -v -N'

Before the fixes, memory usage would continue to grow the more
connections are made, to around 35MB for 500k connections. After the
fixes,  memory usage is steady at around 25MB and doesn't grow with
more connections.
  • Loading branch information
shesek committed Mar 18, 2024
1 parent 49a7180 commit 16636d1
Showing 1 changed file with 31 additions and 27 deletions.
58 changes: 31 additions & 27 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
Expand All @@ -25,9 +25,7 @@ use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::new_index::{Query, Utxo};
use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof};
use crate::util::{
create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry, SyncChannel,
};
use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry};

const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
Expand Down Expand Up @@ -105,7 +103,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")]
Expand All @@ -118,6 +116,7 @@ impl Connection {
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
Expand All @@ -129,7 +128,7 @@ impl Connection {
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
Expand Down Expand Up @@ -352,7 +351,7 @@ impl Connection {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
let txid = self.query.broadcast_raw(&tx)?;
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
}
Ok(json!(txid))
Expand Down Expand Up @@ -391,9 +390,10 @@ impl Connection {
let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash)
.chain_err(|| "cannot create merkle proof")?;
Ok(json!({
"block_height": blockid.height,
"merkle": merkle,
"pos": pos}))
"block_height": blockid.height,
"merkle": merkle,
"pos": pos
}))
}

fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result<Value> {
Expand All @@ -409,7 +409,8 @@ impl Connection {

Ok(json!({
"tx_hash": txid,
"merkle" : merkle}))
"merkle" : merkle
}))
}

fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
Expand Down Expand Up @@ -523,10 +524,10 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, receiver: Receiver<Message>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
let msg = receiver.recv().chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
Expand Down Expand Up @@ -587,7 +588,7 @@ impl Connection {
}
}

fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn parse_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
Expand Down Expand Up @@ -615,14 +616,14 @@ impl Connection {
}
}

pub fn run(mut self) {
pub fn run(mut self, receiver: Receiver<Message>) {
self.stats.clients.inc();
conditionally_log_rpc_event!(self, json!({ "event": "connection established" }));

let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
let sender = self.sender.clone();
let child = spawn_thread("reader", || Connection::parse_requests(reader, sender));
if let Err(e) = self.handle_replies(receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand Down Expand Up @@ -698,14 +699,15 @@ impl RPC {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
senders.retain(|sender| {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
false // drop disconnected clients
} else {
true
}
senders.push(sender);
}
})
}
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
}
Expand Down Expand Up @@ -792,29 +794,31 @@ impl RPC {
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();

while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
// explicitely scope the shadowed variables for the new thread
// explicitly scope the shadowed variables for the new thread
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();
let rpc_logging = config.electrum_rpc_logging.clone();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let rpc_logging = config.electrum_rpc_logging.clone();

let (sender, receiver) = mpsc::sync_channel(10);
senders.lock().unwrap().push(sender.clone());

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(
query,
stream,
addr,
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
conn.run(receiver);
info!("[{}] disconnected peer", addr);
let _ = garbage_sender.send(std::thread::current().id());
});
Expand Down

0 comments on commit 16636d1

Please sign in to comment.