Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threading-related fixes from upstream romanz/electrs #74

Merged
merged 1 commit into from
May 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
Expand All @@ -25,9 +25,7 @@ use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::new_index::{Query, Utxo};
use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof};
use crate::util::{
create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry, SyncChannel,
};
use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry};

const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
Expand Down Expand Up @@ -105,7 +103,7 @@ struct Connection {
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")]
Expand All @@ -118,6 +116,7 @@ impl Connection {
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
sender: SyncSender<Message>,
stats: Arc<Stats>,
txs_limit: usize,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
Expand All @@ -129,7 +128,7 @@ impl Connection {
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
Expand Down Expand Up @@ -352,7 +351,7 @@ impl Connection {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
let txid = self.query.broadcast_raw(&tx)?;
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
if let Err(e) = self.sender.try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
}
Ok(json!(txid))
Expand Down Expand Up @@ -391,9 +390,10 @@ impl Connection {
let (merkle, pos) = get_tx_merkle_proof(self.query.chain(), &txid, &blockid.hash)
.chain_err(|| "cannot create merkle proof")?;
Ok(json!({
"block_height": blockid.height,
"merkle": merkle,
"pos": pos}))
"block_height": blockid.height,
"merkle": merkle,
"pos": pos
}))
}

fn blockchain_transaction_id_from_pos(&self, params: &[Value]) -> Result<Value> {
Expand All @@ -409,7 +409,8 @@ impl Connection {

Ok(json!({
"tx_hash": txid,
"merkle" : merkle}))
"merkle" : merkle
}))
}

fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result<Value> {
Expand Down Expand Up @@ -523,10 +524,10 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, receiver: Receiver<Message>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
let msg = receiver.recv().chain_err(|| "channel closed")?;
let start_time = Instant::now();
trace!("RPC {:?}", msg);
match msg {
Expand Down Expand Up @@ -587,7 +588,7 @@ impl Connection {
}
}

fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn parse_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
Expand Down Expand Up @@ -615,14 +616,14 @@ impl Connection {
}
}

pub fn run(mut self) {
pub fn run(mut self, receiver: Receiver<Message>) {
self.stats.clients.inc();
conditionally_log_rpc_event!(self, json!({ "event": "connection established" }));

let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
let sender = self.sender.clone();
let child = spawn_thread("reader", || Connection::parse_requests(reader, sender));
if let Err(e) = self.handle_replies(receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand Down Expand Up @@ -698,14 +699,15 @@ impl RPC {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
senders.retain(|sender| {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
false // drop disconnected clients
} else {
true
}
senders.push(sender);
}
})
}
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
}
Expand Down Expand Up @@ -792,29 +794,31 @@ impl RPC {
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();

while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
// explicitely scope the shadowed variables for the new thread
// explicitly scope the shadowed variables for the new thread
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();
let rpc_logging = config.electrum_rpc_logging.clone();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let rpc_logging = config.electrum_rpc_logging.clone();

let (sender, receiver) = mpsc::sync_channel(10);
senders.lock().unwrap().push(sender.clone());

let spawned = spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(
query,
stream,
addr,
sender,
stats,
txs_limit,
#[cfg(feature = "electrum-discovery")]
discovery,
rpc_logging,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
conn.run(receiver);
info!("[{}] disconnected peer", addr);
let _ = garbage_sender.send(std::thread::current().id());
});
Expand Down
Loading