From 96e4324ada1fab2905df3b8cd40fcf725aa909af Mon Sep 17 00:00:00 2001 From: junderw Date: Tue, 25 Jul 2023 19:25:51 -0700 Subject: [PATCH 1/5] Fix: mempool.update() frequent crashes in the main loop --- src/new_index/mempool.rs | 96 ++++++++++++++++++++++++++-------------- src/new_index/query.rs | 15 +++++-- src/new_index/schema.rs | 2 + src/rest.rs | 74 +++++++++++++++---------------- 4 files changed, 113 insertions(+), 74 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 863d8eec..bdaaaa49 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -206,9 +206,7 @@ impl Mempool { TxHistoryInfo::Funding(info) => { // Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index. #[cfg(feature = "liquid")] - let txo = self - .lookup_txo(&entry.get_funded_outpoint()) - .expect("missing txo"); + let txo = self.lookup_txo(&entry.get_funded_outpoint()); Some(Utxo { txid: deserialize(&info.txid).expect("invalid txid"), @@ -345,7 +343,7 @@ impl Mempool { } }; // Add new transactions - self.add(to_add)?; + self.add(to_add); // Remove missing transactions self.remove(to_remove); @@ -370,39 +368,64 @@ impl Mempool { pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(txid) { - self.add(vec![tx])?; + if self.add(vec![tx]) == 0 { + return Err(format!( + "Unable to add {txid} to mempool likely due to missing parents." + ) + .into()); + } } } - Ok(()) } - fn add(&mut self, txs: Vec) -> Result<()> { + /// Add transactions to the mempool. + /// + /// The return value is the number of transactions processed. + fn add(&mut self, txs: Vec) -> usize { self.delta .with_label_values(&["add"]) .observe(txs.len() as f64); let _timer = self.latency.with_label_values(&["add"]).start_timer(); - - let mut txids = vec![]; - // Phase 1: add to txstore - for tx in txs { - let txid = tx.txid(); - txids.push(txid); - self.txstore.insert(txid, tx); + let txlen = txs.len(); + if txlen == 0 { + return 0; } - // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(&self.get_prevouts(&txids)) { - Ok(txos) => txos, - Err(err) => { - warn!("lookup txouts failed: {}", err); - // TODO: should we remove txids from txstore? - return Ok(()); - } - }; - for txid in txids { - let tx = self.txstore.get(&txid).expect("missing mempool tx"); + debug!("Adding {} transactions to Mempool", txlen); + + // Phase 1: index history and spend edges (some txos can be missing) + let txids: Vec<_> = txs.iter().map(Transaction::txid).collect(); + let txos = self.lookup_txos(&self.get_prevouts(&txids)); + + // Count how many transactions were actually processed. + let mut processed_count = 0; + + // Phase 2: Iterate over the transactions and do the following: + // 1. Find all of the TxOuts of each input parent using `txos` + // 2. If any parent wasn't found, skip parsing this transaction + // 3. Insert TxFeeInfo into info. + // 4. Push TxOverview into recent tx queue. + // 5. Create the Spend and Fund TxHistory structs for inputs + outputs + // 6. Insert all TxHistory into history. + // 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin))) + // 8. (Liquid only) Parse assets of tx. + for owned_tx in txs { + let txid = owned_tx.txid(); + + let entry = self.txstore.entry(txid); + // Note: This fn doesn't overwrite existing transactions, + // But that's ok, we didn't insert unrelated txes + // into any given txid anyways, so this will always insert. + let tx = &*entry.or_insert_with(|| owned_tx); + + let prevouts = match extract_tx_prevouts(tx, &txos, false) { + Ok(v) => v, + Err(e) => { + warn!("Skipping tx {txid} missing parent error: {e}"); + continue; + } + }; let txid_bytes = full_hash(&txid[..]); - let prevouts = extract_tx_prevouts(tx, &txos, false)?; // Get feeinfo for caching and recent tx overview let feeinfo = TxFeeInfo::new(tx, &prevouts, self.config.network_type); @@ -472,18 +495,20 @@ impl Mempool { &mut self.asset_history, &mut self.asset_issuance, ); + + processed_count += 1; } - Ok(()) + processed_count } - pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result { + pub fn lookup_txo(&self, outpoint: &OutPoint) -> TxOut { let mut outpoints = BTreeSet::new(); outpoints.insert(*outpoint); - Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap()) + self.lookup_txos(&outpoints).remove(outpoint).unwrap() } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { let _timer = self .latency .with_label_values(&["lookup_txos"]) @@ -494,18 +519,21 @@ impl Mempool { let mempool_txos = outpoints .iter() .filter(|outpoint| !confirmed_txos.contains_key(outpoint)) - .map(|outpoint| { + .flat_map(|outpoint| { self.txstore .get(&outpoint.txid) .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) .map(|txout| (*outpoint, txout)) - .chain_err(|| format!("missing outpoint {:?}", outpoint)) + .or_else(|| { + warn!("missing outpoint {:?}", outpoint); + None + }) }) - .collect::>>()?; + .collect::>(); let mut txos = confirmed_txos; txos.extend(mempool_txos); - Ok(txos) + txos } fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet { diff --git a/src/new_index/query.rs b/src/new_index/query.rs index e95e49ca..604dc61f 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -71,10 +71,19 @@ impl Query { pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.daemon.broadcast_raw(txhex)?; - self.mempool + // The important part is whether we succeeded in broadcasting. + // Ignore errors in adding to the cache and show an internal warning. + if let Err(e) = self + .mempool .write() .unwrap() - .add_by_txid(&self.daemon, &txid)?; + .add_by_txid(&self.daemon, &txid) + { + warn!( + "broadcast_raw of {txid} succeeded to broadcast \ + but failed to add to mempool-electrs Mempool cache: {e}" + ); + } Ok(txid) } @@ -118,7 +127,7 @@ impl Query { .or_else(|| self.mempool().lookup_raw_txn(txid)) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool().lookup_txos(outpoints) } diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index fb39b931..97d25737 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -292,6 +292,7 @@ impl Indexer { } fn add(&self, blocks: &[BlockEntry]) { + debug!("Adding {} blocks to Indexer", blocks.len()); // TODO: skip orphaned blocks? let rows = { let _timer = self.start_timer("add_process"); @@ -310,6 +311,7 @@ impl Indexer { } fn index(&self, blocks: &[BlockEntry]) { + debug!("Indexing {} blocks with Indexer", blocks.len()); let previous_txos_map = { let _timer = self.start_timer("index_lookup"); lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) diff --git a/src/rest.rs b/src/rest.rs index ceff090e..bbc14ae6 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -502,7 +502,7 @@ fn prepare_txs( txs: Vec<(Transaction, Option)>, query: &Query, config: &Config, -) -> Result, errors::Error> { +) -> Vec { let outpoints = txs .iter() .flat_map(|(tx, _)| { @@ -513,12 +513,11 @@ fn prepare_txs( }) .collect(); - let prevouts = query.lookup_txos(&outpoints)?; + let prevouts = query.lookup_txos(&outpoints); - Ok(txs - .into_iter() + txs.into_iter() .map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config)) - .collect()) + .collect() } #[tokio::main] @@ -709,7 +708,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } (&Method::GET, Some(&"block"), Some(hash), Some(&"header"), None, None) => { let hash = BlockHash::from_hex(hash)?; @@ -786,7 +785,7 @@ fn handle_request( // XXX orphraned blocks alway get TTL_SHORT let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query); - json_maybe_error_response(prepare_txs(txs, query, config), ttl) + json_response(prepare_txs(txs, query, config), ttl) } (&Method::GET, Some(script_type @ &"address"), Some(script_str), None, None, None) | (&Method::GET, Some(script_type @ &"scripthash"), Some(script_str), None, None, None) => { @@ -874,7 +873,7 @@ fn handle_request( ); } - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } ( @@ -907,7 +906,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } ( &Method::GET, @@ -938,7 +937,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } ( @@ -981,9 +980,10 @@ fn handle_request( let blockid = query.chain().tx_confirming_block(&hash); let ttl = ttl_by_depth(blockid.as_ref().map(|b| b.height), query); - let tx = prepare_txs(vec![(tx, blockid)], query, config).map(|mut v| v.remove(0)); + let mut tx = prepare_txs(vec![(tx, blockid)], query, config); + tx.remove(0); - json_maybe_error_response(tx, ttl) + json_response(tx, ttl) } (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None) | (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => { @@ -1106,7 +1106,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } (&Method::GET, Some(&"mempool"), Some(&"txs"), last_seen_txid, None, None) => { let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); @@ -1117,7 +1117,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } (&Method::GET, Some(&"mempool"), Some(&"recent"), None, None, None) => { let mempool = query.mempool(); @@ -1188,7 +1188,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))), ); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1214,7 +1214,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1228,7 +1228,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) + json_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1281,26 +1281,26 @@ fn json_response(value: T, ttl: u32) -> Result, Htt .unwrap()) } -fn json_maybe_error_response( - value: Result, - ttl: u32, -) -> Result, HttpError> { - let response = Response::builder() - .header("Content-Type", "application/json") - .header("Cache-Control", format!("public, max-age={:}", ttl)) - .header("X-Powered-By", &**VERSION_STRING); - Ok(match value { - Ok(v) => response - .body(Body::from(serde_json::to_string(&v)?)) - .expect("Valid http response"), - Err(e) => response - .status(500) - .body(Body::from(serde_json::to_string( - &json!({ "error": e.to_string() }), - )?)) - .expect("Valid http response"), - }) -} +// fn json_maybe_error_response( +// value: Result, +// ttl: u32, +// ) -> Result, HttpError> { +// let response = Response::builder() +// .header("Content-Type", "application/json") +// .header("Cache-Control", format!("public, max-age={:}", ttl)) +// .header("X-Powered-By", &**VERSION_STRING); +// Ok(match value { +// Ok(v) => response +// .body(Body::from(serde_json::to_string(&v)?)) +// .expect("Valid http response"), +// Err(e) => response +// .status(500) +// .body(Body::from(serde_json::to_string( +// &json!({ "error": e.to_string() }), +// )?)) +// .expect("Valid http response"), +// }) +// } fn blocks( query: &Query, From 1b4f8466e93b2e959773625a81d6b032321f7e1c Mon Sep 17 00:00:00 2001 From: junderw Date: Tue, 25 Jul 2023 20:47:31 -0700 Subject: [PATCH 2/5] Fix unwrap that can now possibly be None --- src/new_index/mempool.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index bdaaaa49..f4b540a3 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -206,7 +206,7 @@ impl Mempool { TxHistoryInfo::Funding(info) => { // Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index. #[cfg(feature = "liquid")] - let txo = self.lookup_txo(&entry.get_funded_outpoint()); + let txo = self.lookup_txo(&entry.get_funded_outpoint())?; Some(Utxo { txid: deserialize(&info.txid).expect("invalid txid"), @@ -502,12 +502,18 @@ impl Mempool { processed_count } - pub fn lookup_txo(&self, outpoint: &OutPoint) -> TxOut { + /// Returns None if the lookup fails (mempool transaction RBF-ed etc.) + pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option { let mut outpoints = BTreeSet::new(); outpoints.insert(*outpoint); - self.lookup_txos(&outpoints).remove(outpoint).unwrap() + // This can possibly be None now + self.lookup_txos(&outpoints).remove(outpoint) } + /// For a given set of OutPoints, return a HashMap + /// + /// Not all OutPoints from mempool transactions are guaranteed to be there. + /// Ensure you deal with the None case in your logic. pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { let _timer = self .latency From 875057e129ee5f890b9dcacc8369aadf0e31c88f Mon Sep 17 00:00:00 2001 From: junderw Date: Tue, 25 Jul 2023 20:53:55 -0700 Subject: [PATCH 3/5] Add must_use --- src/new_index/mempool.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index f4b540a3..b896cb1b 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -343,7 +343,9 @@ impl Mempool { } }; // Add new transactions - self.add(to_add); + if to_add.len() > self.add(to_add) { + debug!("Mempool update added less transactions than expected"); + } // Remove missing transactions self.remove(to_remove); @@ -382,6 +384,7 @@ impl Mempool { /// Add transactions to the mempool. /// /// The return value is the number of transactions processed. + #[must_use = "Must deal with [[input vec's length]] > [[result]]."] fn add(&mut self, txs: Vec) -> usize { self.delta .with_label_values(&["add"]) From c8a370d3877ff3d875ebe66710056d058175caf3 Mon Sep 17 00:00:00 2001 From: junderw Date: Wed, 26 Jul 2023 00:16:22 -0700 Subject: [PATCH 4/5] Fix order of operations to not fail --- src/new_index/mempool.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index b896cb1b..f393eb2b 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -396,14 +396,21 @@ impl Mempool { } debug!("Adding {} transactions to Mempool", txlen); - // Phase 1: index history and spend edges (some txos can be missing) - let txids: Vec<_> = txs.iter().map(Transaction::txid).collect(); + let mut txids = Vec::with_capacity(txs.len()); + // Phase 1: add to txstore + for tx in txs { + let txid = tx.txid(); + txids.push(txid); + self.txstore.insert(txid, tx); + } + + // Phase 2: index history and spend edges (some txos can be missing) let txos = self.lookup_txos(&self.get_prevouts(&txids)); // Count how many transactions were actually processed. let mut processed_count = 0; - // Phase 2: Iterate over the transactions and do the following: + // Phase 3: Iterate over the transactions and do the following: // 1. Find all of the TxOuts of each input parent using `txos` // 2. If any parent wasn't found, skip parsing this transaction // 3. Insert TxFeeInfo into info. @@ -412,14 +419,8 @@ impl Mempool { // 6. Insert all TxHistory into history. // 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin))) // 8. (Liquid only) Parse assets of tx. - for owned_tx in txs { - let txid = owned_tx.txid(); - - let entry = self.txstore.entry(txid); - // Note: This fn doesn't overwrite existing transactions, - // But that's ok, we didn't insert unrelated txes - // into any given txid anyways, so this will always insert. - let tx = &*entry.or_insert_with(|| owned_tx); + for txid in txids { + let tx = self.txstore.get(&txid).expect("missing tx from txstore"); let prevouts = match extract_tx_prevouts(tx, &txos, false) { Ok(v) => v, From 3188ddf7d4f160219aa312108328952d16a6696f Mon Sep 17 00:00:00 2001 From: junderw Date: Wed, 26 Jul 2023 01:21:40 -0700 Subject: [PATCH 5/5] Fix /tx/HASH bug --- src/rest.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rest.rs b/src/rest.rs index bbc14ae6..920d90aa 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -981,9 +981,8 @@ fn handle_request( let ttl = ttl_by_depth(blockid.as_ref().map(|b| b.height), query); let mut tx = prepare_txs(vec![(tx, blockid)], query, config); - tx.remove(0); - json_response(tx, ttl) + json_response(tx.remove(0), ttl) } (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None) | (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => {