Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shorten mempool lock holding for update #87

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,18 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
mempool.write().unwrap().update(&daemon)?;
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!(
"Error performing initial mempool update, trying again in 5 seconds: {}",
e.display_chain()
);
signal.wait(Duration::from_secs(5), false)?;
}
}
}

#[cfg(feature = "liquid")]
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
Expand Down Expand Up @@ -136,7 +147,13 @@ fn run_server(config: Arc<Config>) -> Result<()> {
};

// Update mempool
mempool.write().unwrap().update(&daemon)?;
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!(
"Error updating mempool, skipping mempool update: {}",
e.display_chain()
);
}

// Update subscribed clients
electrum_server.notify();
Expand Down
99 changes: 62 additions & 37 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use elements::{encode::serialize, AssetId};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::iter::FromIterator;
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
Expand Down Expand Up @@ -343,46 +343,67 @@ impl Mempool {
&self.backlog_stats.0
}

pub fn update(&mut self, daemon: &Daemon) -> Result<()> {
let _timer = self.latency.with_label_values(&["update"]).start_timer();
let new_txids = daemon
pub fn unique_txids(&self) -> HashSet<Txid> {
return HashSet::from_iter(self.txstore.keys().cloned());
}

pub fn update(mempool: &RwLock<Mempool>, daemon: &Daemon) -> Result<()> {
// 1. Start the metrics timer and get the current mempool txids
// [LOCK] Takes read lock for whole scope.
let (_timer, old_txids) = {
let mempool = mempool.read().unwrap();
(
mempool.latency.with_label_values(&["update"]).start_timer(),
mempool.unique_txids(),
)
};

// 2. Get all the mempool txids from the RPC.
// [LOCK] No lock taken. Wait for RPC request. Get lists of remove/add txes.
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_txids = HashSet::from_iter(self.txstore.keys().cloned());
let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect();

// Download and add new transactions from bitcoind's mempool
let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect();
let to_add = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
return Ok(()); // keep the mempool until next update()
let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect();
let txids_to_add: Vec<&Txid> = all_txids.difference(&old_txids).collect();

// 3. Remove missing transactions. Even if we are unable to download new transactions from
// the daemon, we still want to remove the transactions that are no longer in the mempool.
// [LOCK] Write lock is released at the end of the call to remove().
mempool.write().unwrap().remove(txids_to_remove);

// 4. Download the new transactions from the daemon's mempool
// [LOCK] No lock taken, waiting for RPC response.
let txs_to_add = daemon
.gettransactions(&txids_to_add)
.chain_err(|| format!("failed to get {} transactions", txids_to_add.len()))?;

// 4. Update local mempool to match daemon's state
// [LOCK] Takes Write lock for whole scope.
{
let mut mempool = mempool.write().unwrap();
// Add new transactions
if txs_to_add.len() > mempool.add(txs_to_add) {
debug!("Mempool update added less transactions than expected");
}
};
// Add new transactions
if to_add.len() > self.add(to_add) {
debug!("Mempool update added less transactions than expected");
}
// Remove missing transactions
self.remove(to_remove);

self.count
.with_label_values(&["txs"])
.set(self.txstore.len() as f64);
mempool
.count
.with_label_values(&["txs"])
.set(mempool.txstore.len() as f64);

// Update cached backlog stats (if expired)
if mempool.backlog_stats.1.elapsed()
> Duration::from_secs(mempool.config.mempool_backlog_stats_ttl)
{
let _timer = mempool
.latency
.with_label_values(&["update_backlog_stats"])
.start_timer();
mempool.backlog_stats = (BacklogStats::new(&mempool.feeinfo), Instant::now());
}

// Update cached backlog stats (if expired)
if self.backlog_stats.1.elapsed()
> Duration::from_secs(self.config.mempool_backlog_stats_ttl)
{
let _timer = self
.latency
.with_label_values(&["update_backlog_stats"])
.start_timer();
self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now());
Ok(())
}

Ok(())
}

pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
Expand Down Expand Up @@ -418,8 +439,12 @@ impl Mempool {
// Phase 1: add to txstore
for tx in txs {
let txid = tx.txid();
txids.push(txid);
self.txstore.insert(txid, tx);
// Only push if it doesn't already exist.
// This is important now that update doesn't lock during
// the entire function body.
if self.txstore.insert(txid, tx).is_none() {
txids.push(txid);
}
}

// Phase 2: index history and spend edges (some txos can be missing)
Expand Down
Loading