Skip to content

Commit

Permalink
Fix: mempool.update() frequent crashes in the main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Jul 26, 2023
1 parent 3c66022 commit 96e4324
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 74 deletions.
96 changes: 62 additions & 34 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ impl Mempool {
TxHistoryInfo::Funding(info) => {
// Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index.
#[cfg(feature = "liquid")]
let txo = self
.lookup_txo(&entry.get_funded_outpoint())
.expect("missing txo");
let txo = self.lookup_txo(&entry.get_funded_outpoint());

Some(Utxo {
txid: deserialize(&info.txid).expect("invalid txid"),
Expand Down Expand Up @@ -345,7 +343,7 @@ impl Mempool {
}
};
// Add new transactions
self.add(to_add)?;
self.add(to_add);
// Remove missing transactions
self.remove(to_remove);

Expand All @@ -370,39 +368,64 @@ impl Mempool {
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
if self.txstore.get(txid).is_none() {
if let Ok(tx) = daemon.getmempooltx(txid) {
self.add(vec![tx])?;
if self.add(vec![tx]) == 0 {
return Err(format!(
"Unable to add {txid} to mempool likely due to missing parents."
)
.into());
}
}
}

Ok(())
}

fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
/// Add transactions to the mempool.
///
/// The return value is the number of transactions processed.
fn add(&mut self, txs: Vec<Transaction>) -> usize {
self.delta
.with_label_values(&["add"])
.observe(txs.len() as f64);
let _timer = self.latency.with_label_values(&["add"]).start_timer();

let mut txids = vec![];
// Phase 1: add to txstore
for tx in txs {
let txid = tx.txid();
txids.push(txid);
self.txstore.insert(txid, tx);
let txlen = txs.len();
if txlen == 0 {
return 0;
}
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
Ok(txos) => txos,
Err(err) => {
warn!("lookup txouts failed: {}", err);
// TODO: should we remove txids from txstore?
return Ok(());
}
};
for txid in txids {
let tx = self.txstore.get(&txid).expect("missing mempool tx");
debug!("Adding {} transactions to Mempool", txlen);

// Phase 1: index history and spend edges (some txos can be missing)
let txids: Vec<_> = txs.iter().map(Transaction::txid).collect();
let txos = self.lookup_txos(&self.get_prevouts(&txids));

// Count how many transactions were actually processed.
let mut processed_count = 0;

// Phase 2: Iterate over the transactions and do the following:
// 1. Find all of the TxOuts of each input parent using `txos`
// 2. If any parent wasn't found, skip parsing this transaction
// 3. Insert TxFeeInfo into info.
// 4. Push TxOverview into recent tx queue.
// 5. Create the Spend and Fund TxHistory structs for inputs + outputs
// 6. Insert all TxHistory into history.
// 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin)))
// 8. (Liquid only) Parse assets of tx.
for owned_tx in txs {
let txid = owned_tx.txid();

let entry = self.txstore.entry(txid);
// Note: This fn doesn't overwrite existing transactions,
// But that's ok, we didn't insert unrelated txes
// into any given txid anyways, so this will always insert.
let tx = &*entry.or_insert_with(|| owned_tx);

let prevouts = match extract_tx_prevouts(tx, &txos, false) {
Ok(v) => v,
Err(e) => {
warn!("Skipping tx {txid} missing parent error: {e}");
continue;
}
};
let txid_bytes = full_hash(&txid[..]);
let prevouts = extract_tx_prevouts(tx, &txos, false)?;

// Get feeinfo for caching and recent tx overview
let feeinfo = TxFeeInfo::new(tx, &prevouts, self.config.network_type);
Expand Down Expand Up @@ -472,18 +495,20 @@ impl Mempool {
&mut self.asset_history,
&mut self.asset_issuance,
);

processed_count += 1;
}

Ok(())
processed_count
}

pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
pub fn lookup_txo(&self, outpoint: &OutPoint) -> TxOut {
let mut outpoints = BTreeSet::new();
outpoints.insert(*outpoint);
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
self.lookup_txos(&outpoints).remove(outpoint).unwrap()
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
let _timer = self
.latency
.with_label_values(&["lookup_txos"])
Expand All @@ -494,18 +519,21 @@ impl Mempool {
let mempool_txos = outpoints
.iter()
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
.map(|outpoint| {
.flat_map(|outpoint| {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
.map(|txout| (*outpoint, txout))
.chain_err(|| format!("missing outpoint {:?}", outpoint))
.or_else(|| {
warn!("missing outpoint {:?}", outpoint);
None
})
})
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
.collect::<HashMap<OutPoint, TxOut>>();

let mut txos = confirmed_txos;
txos.extend(mempool_txos);
Ok(txos)
txos
}

fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet<OutPoint> {
Expand Down
15 changes: 12 additions & 3 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ impl Query {

pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.daemon.broadcast_raw(txhex)?;
self.mempool
// The important part is whether we succeeded in broadcasting.
// Ignore errors in adding to the cache and show an internal warning.
if let Err(e) = self
.mempool
.write()
.unwrap()
.add_by_txid(&self.daemon, &txid)?;
.add_by_txid(&self.daemon, &txid)
{
warn!(
"broadcast_raw of {txid} succeeded to broadcast \
but failed to add to mempool-electrs Mempool cache: {e}"
);
}
Ok(txid)
}

Expand Down Expand Up @@ -118,7 +127,7 @@ impl Query {
.or_else(|| self.mempool().lookup_raw_txn(txid))
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool().lookup_txos(outpoints)
}
Expand Down
2 changes: 2 additions & 0 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ impl Indexer {
}

fn add(&self, blocks: &[BlockEntry]) {
debug!("Adding {} blocks to Indexer", blocks.len());
// TODO: skip orphaned blocks?
let rows = {
let _timer = self.start_timer("add_process");
Expand All @@ -310,6 +311,7 @@ impl Indexer {
}

fn index(&self, blocks: &[BlockEntry]) {
debug!("Indexing {} blocks with Indexer", blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
Expand Down
74 changes: 37 additions & 37 deletions src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ fn prepare_txs(
txs: Vec<(Transaction, Option<BlockId>)>,
query: &Query,
config: &Config,
) -> Result<Vec<TransactionValue>, errors::Error> {
) -> Vec<TransactionValue> {
let outpoints = txs
.iter()
.flat_map(|(tx, _)| {
Expand All @@ -513,12 +513,11 @@ fn prepare_txs(
})
.collect();

let prevouts = query.lookup_txos(&outpoints)?;
let prevouts = query.lookup_txos(&outpoints);

Ok(txs
.into_iter()
txs.into_iter()
.map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config))
.collect())
.collect()
}

#[tokio::main]
Expand Down Expand Up @@ -709,7 +708,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"block"), Some(hash), Some(&"header"), None, None) => {
let hash = BlockHash::from_hex(hash)?;
Expand Down Expand Up @@ -786,7 +785,7 @@ fn handle_request(
// XXX orphraned blocks alway get TTL_SHORT
let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query);

json_maybe_error_response(prepare_txs(txs, query, config), ttl)
json_response(prepare_txs(txs, query, config), ttl)
}
(&Method::GET, Some(script_type @ &"address"), Some(script_str), None, None, None)
| (&Method::GET, Some(script_type @ &"scripthash"), Some(script_str), None, None, None) => {
Expand Down Expand Up @@ -874,7 +873,7 @@ fn handle_request(
);
}

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

(
Expand Down Expand Up @@ -907,7 +906,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid)))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(
&Method::GET,
Expand Down Expand Up @@ -938,7 +937,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

(
Expand Down Expand Up @@ -981,9 +980,10 @@ fn handle_request(
let blockid = query.chain().tx_confirming_block(&hash);
let ttl = ttl_by_depth(blockid.as_ref().map(|b| b.height), query);

let tx = prepare_txs(vec![(tx, blockid)], query, config).map(|mut v| v.remove(0));
let mut tx = prepare_txs(vec![(tx, blockid)], query, config);
tx.remove(0);

json_maybe_error_response(tx, ttl)
json_response(tx, ttl)
}
(&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None)
| (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => {
Expand Down Expand Up @@ -1106,7 +1106,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"mempool"), Some(&"txs"), last_seen_txid, None, None) => {
let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok());
Expand All @@ -1117,7 +1117,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"mempool"), Some(&"recent"), None, None, None) => {
let mempool = query.mempool();
Expand Down Expand Up @@ -1188,7 +1188,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid))),
);

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand All @@ -1214,7 +1214,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid)))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand All @@ -1228,7 +1228,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -1281,26 +1281,26 @@ fn json_response<T: Serialize>(value: T, ttl: u32) -> Result<Response<Body>, Htt
.unwrap())
}

fn json_maybe_error_response<T: Serialize>(
value: Result<T, errors::Error>,
ttl: u32,
) -> Result<Response<Body>, HttpError> {
let response = Response::builder()
.header("Content-Type", "application/json")
.header("Cache-Control", format!("public, max-age={:}", ttl))
.header("X-Powered-By", &**VERSION_STRING);
Ok(match value {
Ok(v) => response
.body(Body::from(serde_json::to_string(&v)?))
.expect("Valid http response"),
Err(e) => response
.status(500)
.body(Body::from(serde_json::to_string(
&json!({ "error": e.to_string() }),
)?))
.expect("Valid http response"),
})
}
// fn json_maybe_error_response<T: Serialize>(
// value: Result<T, errors::Error>,
// ttl: u32,
// ) -> Result<Response<Body>, HttpError> {
// let response = Response::builder()
// .header("Content-Type", "application/json")
// .header("Cache-Control", format!("public, max-age={:}", ttl))
// .header("X-Powered-By", &**VERSION_STRING);
// Ok(match value {
// Ok(v) => response
// .body(Body::from(serde_json::to_string(&v)?))
// .expect("Valid http response"),
// Err(e) => response
// .status(500)
// .body(Body::from(serde_json::to_string(
// &json!({ "error": e.to_string() }),
// )?))
// .expect("Valid http response"),
// })
// }

fn blocks(
query: &Query,
Expand Down

0 comments on commit 96e4324

Please sign in to comment.