diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 91a51eef..ec4b387a 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -74,7 +74,18 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - mempool.write().unwrap().update(&daemon)?; + loop { + match Mempool::update(&mempool, &daemon) { + Ok(_) => break, + Err(e) => { + warn!( + "Error performing initial mempool update, trying again in 5 seconds: {}", + e.display_chain() + ); + signal.wait(Duration::from_secs(5), false)?; + } + } + } #[cfg(feature = "liquid")] let asset_db = config.asset_db_path.as_ref().map(|db_dir| { @@ -136,7 +147,13 @@ fn run_server(config: Arc) -> Result<()> { }; // Update mempool - mempool.write().unwrap().update(&daemon)?; + if let Err(e) = Mempool::update(&mempool, &daemon) { + // Log the error if the result is an Err + warn!( + "Error updating mempool, skipping mempool update: {}", + e.display_chain() + ); + } // Update subscribed clients electrum_server.notify(); diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index c3841d52..8a94962f 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -9,7 +9,7 @@ use elements::{encode::serialize, AssetId}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::iter::FromIterator; use std::ops::Bound::{Excluded, Unbounded}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; @@ -343,46 +343,67 @@ impl Mempool { &self.backlog_stats.0 } - pub fn update(&mut self, daemon: &Daemon) -> Result<()> { - let _timer = self.latency.with_label_values(&["update"]).start_timer(); - let new_txids = daemon + pub fn unique_txids(&self) -> HashSet { + return HashSet::from_iter(self.txstore.keys().cloned()); + } + + pub fn update(mempool: &RwLock, daemon: &Daemon) -> Result<()> { + // 1. Start the metrics timer and get the current mempool txids + // [LOCK] Takes read lock for whole scope. + let (_timer, old_txids) = { + let mempool = mempool.read().unwrap(); + ( + mempool.latency.with_label_values(&["update"]).start_timer(), + mempool.unique_txids(), + ) + }; + + // 2. Get all the mempool txids from the RPC. + // [LOCK] No lock taken. Wait for RPC request. Get lists of remove/add txes. + let all_txids = daemon .getmempooltxids() .chain_err(|| "failed to update mempool from daemon")?; - let old_txids = HashSet::from_iter(self.txstore.keys().cloned()); - let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect(); - - // Download and add new transactions from bitcoind's mempool - let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect(); - let to_add = match daemon.gettransactions(&txids) { - Ok(txs) => txs, - Err(err) => { - warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF - return Ok(()); // keep the mempool until next update() + let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect(); + let txids_to_add: Vec<&Txid> = all_txids.difference(&old_txids).collect(); + + // 3. Remove missing transactions. Even if we are unable to download new transactions from + // the daemon, we still want to remove the transactions that are no longer in the mempool. + // [LOCK] Write lock is released at the end of the call to remove(). + mempool.write().unwrap().remove(txids_to_remove); + + // 4. Download the new transactions from the daemon's mempool + // [LOCK] No lock taken, waiting for RPC response. + let txs_to_add = daemon + .gettransactions(&txids_to_add) + .chain_err(|| format!("failed to get {} transactions", txids_to_add.len()))?; + + // 4. Update local mempool to match daemon's state + // [LOCK] Takes Write lock for whole scope. + { + let mut mempool = mempool.write().unwrap(); + // Add new transactions + if txs_to_add.len() > mempool.add(txs_to_add) { + debug!("Mempool update added less transactions than expected"); } - }; - // Add new transactions - if to_add.len() > self.add(to_add) { - debug!("Mempool update added less transactions than expected"); - } - // Remove missing transactions - self.remove(to_remove); - self.count - .with_label_values(&["txs"]) - .set(self.txstore.len() as f64); + mempool + .count + .with_label_values(&["txs"]) + .set(mempool.txstore.len() as f64); + + // Update cached backlog stats (if expired) + if mempool.backlog_stats.1.elapsed() + > Duration::from_secs(mempool.config.mempool_backlog_stats_ttl) + { + let _timer = mempool + .latency + .with_label_values(&["update_backlog_stats"]) + .start_timer(); + mempool.backlog_stats = (BacklogStats::new(&mempool.feeinfo), Instant::now()); + } - // Update cached backlog stats (if expired) - if self.backlog_stats.1.elapsed() - > Duration::from_secs(self.config.mempool_backlog_stats_ttl) - { - let _timer = self - .latency - .with_label_values(&["update_backlog_stats"]) - .start_timer(); - self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); + Ok(()) } - - Ok(()) } pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { @@ -418,8 +439,12 @@ impl Mempool { // Phase 1: add to txstore for tx in txs { let txid = tx.txid(); - txids.push(txid); - self.txstore.insert(txid, tx); + // Only push if it doesn't already exist. + // This is important now that update doesn't lock during + // the entire function body. + if self.txstore.insert(txid, tx).is_none() { + txids.push(txid); + } } // Phase 2: index history and spend edges (some txos can be missing)