Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: mempool.update() frequent crashes in the main loop #25

Merged
merged 5 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ impl Mempool {
TxHistoryInfo::Funding(info) => {
// Liquid requires some additional information from the txo that's not available in the TxHistoryInfo index.
#[cfg(feature = "liquid")]
let txo = self
.lookup_txo(&entry.get_funded_outpoint())
.expect("missing txo");
let txo = self.lookup_txo(&entry.get_funded_outpoint())?;

Some(Utxo {
txid: deserialize(&info.txid).expect("invalid txid"),
Expand Down Expand Up @@ -345,7 +343,9 @@ impl Mempool {
}
};
// Add new transactions
self.add(to_add)?;
if to_add.len() > self.add(to_add) {
debug!("Mempool update added less transactions than expected");
}
// Remove missing transactions
self.remove(to_remove);

Expand All @@ -370,39 +370,66 @@ impl Mempool {
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
if self.txstore.get(txid).is_none() {
if let Ok(tx) = daemon.getmempooltx(txid) {
self.add(vec![tx])?;
if self.add(vec![tx]) == 0 {
return Err(format!(
"Unable to add {txid} to mempool likely due to missing parents."
)
.into());
}
}
}

Ok(())
}

fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
/// Add transactions to the mempool.
///
/// The return value is the number of transactions processed.
#[must_use = "Must deal with [[input vec's length]] > [[result]]."]
fn add(&mut self, txs: Vec<Transaction>) -> usize {
self.delta
.with_label_values(&["add"])
.observe(txs.len() as f64);
let _timer = self.latency.with_label_values(&["add"]).start_timer();
let txlen = txs.len();
if txlen == 0 {
return 0;
}
debug!("Adding {} transactions to Mempool", txlen);

let mut txids = vec![];
let mut txids = Vec::with_capacity(txs.len());
// Phase 1: add to txstore
for tx in txs {
let txid = tx.txid();
txids.push(txid);
self.txstore.insert(txid, tx);
}
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
Ok(txos) => txos,
Err(err) => {
warn!("lookup txouts failed: {}", err);
// TODO: should we remove txids from txstore?
return Ok(());
}
};

// Phase 2: index history and spend edges (some txos can be missing)
let txos = self.lookup_txos(&self.get_prevouts(&txids));

// Count how many transactions were actually processed.
let mut processed_count = 0;

// Phase 3: Iterate over the transactions and do the following:
// 1. Find all of the TxOuts of each input parent using `txos`
// 2. If any parent wasn't found, skip parsing this transaction
// 3. Insert TxFeeInfo into info.
// 4. Push TxOverview into recent tx queue.
// 5. Create the Spend and Fund TxHistory structs for inputs + outputs
// 6. Insert all TxHistory into history.
// 7. Insert the tx edges into edges (HashMap of (Outpoint, (Txid, vin)))
// 8. (Liquid only) Parse assets of tx.
for txid in txids {
let tx = self.txstore.get(&txid).expect("missing mempool tx");
let tx = self.txstore.get(&txid).expect("missing tx from txstore");

let prevouts = match extract_tx_prevouts(tx, &txos, false) {
Ok(v) => v,
Err(e) => {
warn!("Skipping tx {txid} missing parent error: {e}");
continue;
}
};
let txid_bytes = full_hash(&txid[..]);
let prevouts = extract_tx_prevouts(tx, &txos, false)?;

// Get feeinfo for caching and recent tx overview
let feeinfo = TxFeeInfo::new(tx, &prevouts, self.config.network_type);
Expand Down Expand Up @@ -472,18 +499,26 @@ impl Mempool {
&mut self.asset_history,
&mut self.asset_issuance,
);

processed_count += 1;
}

Ok(())
processed_count
}

pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
/// Returns None if the lookup fails (mempool transaction RBF-ed etc.)
pub fn lookup_txo(&self, outpoint: &OutPoint) -> Option<TxOut> {
let mut outpoints = BTreeSet::new();
outpoints.insert(*outpoint);
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
// This can possibly be None now
self.lookup_txos(&outpoints).remove(outpoint)
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
/// For a given set of OutPoints, return a HashMap<OutPoint, TxOut>
///
/// Not all OutPoints from mempool transactions are guaranteed to be there.
/// Ensure you deal with the None case in your logic.
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
let _timer = self
.latency
.with_label_values(&["lookup_txos"])
Expand All @@ -494,18 +529,21 @@ impl Mempool {
let mempool_txos = outpoints
.iter()
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
.map(|outpoint| {
.flat_map(|outpoint| {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
.map(|txout| (*outpoint, txout))
.chain_err(|| format!("missing outpoint {:?}", outpoint))
.or_else(|| {
warn!("missing outpoint {:?}", outpoint);
None
})
})
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
.collect::<HashMap<OutPoint, TxOut>>();

let mut txos = confirmed_txos;
txos.extend(mempool_txos);
Ok(txos)
txos
}

fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet<OutPoint> {
Expand Down
15 changes: 12 additions & 3 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ impl Query {

pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.daemon.broadcast_raw(txhex)?;
self.mempool
// The important part is whether we succeeded in broadcasting.
// Ignore errors in adding to the cache and show an internal warning.
if let Err(e) = self
.mempool
.write()
.unwrap()
.add_by_txid(&self.daemon, &txid)?;
.add_by_txid(&self.daemon, &txid)
{
warn!(
"broadcast_raw of {txid} succeeded to broadcast \
but failed to add to mempool-electrs Mempool cache: {e}"
);
}
Ok(txid)
}

Expand Down Expand Up @@ -118,7 +127,7 @@ impl Query {
.or_else(|| self.mempool().lookup_raw_txn(txid))
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool().lookup_txos(outpoints)
}
Expand Down
2 changes: 2 additions & 0 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ impl Indexer {
}

fn add(&self, blocks: &[BlockEntry]) {
debug!("Adding {} blocks to Indexer", blocks.len());
// TODO: skip orphaned blocks?
let rows = {
let _timer = self.start_timer("add_process");
Expand All @@ -310,6 +311,7 @@ impl Indexer {
}

fn index(&self, blocks: &[BlockEntry]) {
debug!("Indexing {} blocks with Indexer", blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
Expand Down
73 changes: 36 additions & 37 deletions src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ fn prepare_txs(
txs: Vec<(Transaction, Option<BlockId>)>,
query: &Query,
config: &Config,
) -> Result<Vec<TransactionValue>, errors::Error> {
) -> Vec<TransactionValue> {
let outpoints = txs
.iter()
.flat_map(|(tx, _)| {
Expand All @@ -513,12 +513,11 @@ fn prepare_txs(
})
.collect();

let prevouts = query.lookup_txos(&outpoints)?;
let prevouts = query.lookup_txos(&outpoints);

Ok(txs
.into_iter()
txs.into_iter()
.map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config))
.collect())
.collect()
}

#[tokio::main]
Expand Down Expand Up @@ -709,7 +708,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"block"), Some(hash), Some(&"header"), None, None) => {
let hash = BlockHash::from_hex(hash)?;
Expand Down Expand Up @@ -786,7 +785,7 @@ fn handle_request(
// XXX orphraned blocks alway get TTL_SHORT
let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query);

json_maybe_error_response(prepare_txs(txs, query, config), ttl)
json_response(prepare_txs(txs, query, config), ttl)
}
(&Method::GET, Some(script_type @ &"address"), Some(script_str), None, None, None)
| (&Method::GET, Some(script_type @ &"scripthash"), Some(script_str), None, None, None) => {
Expand Down Expand Up @@ -874,7 +873,7 @@ fn handle_request(
);
}

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

(
Expand Down Expand Up @@ -907,7 +906,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid)))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(
&Method::GET,
Expand Down Expand Up @@ -938,7 +937,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

(
Expand Down Expand Up @@ -981,9 +980,9 @@ fn handle_request(
let blockid = query.chain().tx_confirming_block(&hash);
let ttl = ttl_by_depth(blockid.as_ref().map(|b| b.height), query);

let tx = prepare_txs(vec![(tx, blockid)], query, config).map(|mut v| v.remove(0));
let mut tx = prepare_txs(vec![(tx, blockid)], query, config);

json_maybe_error_response(tx, ttl)
json_response(tx.remove(0), ttl)
}
(&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None)
| (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => {
Expand Down Expand Up @@ -1106,7 +1105,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"mempool"), Some(&"txs"), last_seen_txid, None, None) => {
let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok());
Expand All @@ -1117,7 +1116,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}
(&Method::GET, Some(&"mempool"), Some(&"recent"), None, None, None) => {
let mempool = query.mempool();
Expand Down Expand Up @@ -1188,7 +1187,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid))),
);

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand All @@ -1214,7 +1213,7 @@ fn handle_request(
.map(|(tx, blockid)| (tx, Some(blockid)))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand All @@ -1228,7 +1227,7 @@ fn handle_request(
.map(|tx| (tx, None))
.collect();

json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT)
json_response(prepare_txs(txs, query, config), TTL_SHORT)
}

#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -1281,26 +1280,26 @@ fn json_response<T: Serialize>(value: T, ttl: u32) -> Result<Response<Body>, Htt
.unwrap())
}

fn json_maybe_error_response<T: Serialize>(
value: Result<T, errors::Error>,
ttl: u32,
) -> Result<Response<Body>, HttpError> {
let response = Response::builder()
.header("Content-Type", "application/json")
.header("Cache-Control", format!("public, max-age={:}", ttl))
.header("X-Powered-By", &**VERSION_STRING);
Ok(match value {
Ok(v) => response
.body(Body::from(serde_json::to_string(&v)?))
.expect("Valid http response"),
Err(e) => response
.status(500)
.body(Body::from(serde_json::to_string(
&json!({ "error": e.to_string() }),
)?))
.expect("Valid http response"),
})
}
// fn json_maybe_error_response<T: Serialize>(
// value: Result<T, errors::Error>,
// ttl: u32,
// ) -> Result<Response<Body>, HttpError> {
// let response = Response::builder()
// .header("Content-Type", "application/json")
// .header("Cache-Control", format!("public, max-age={:}", ttl))
// .header("X-Powered-By", &**VERSION_STRING);
// Ok(match value {
// Ok(v) => response
// .body(Body::from(serde_json::to_string(&v)?))
// .expect("Valid http response"),
// Err(e) => response
// .status(500)
// .body(Body::from(serde_json::to_string(
// &json!({ "error": e.to_string() }),
// )?))
// .expect("Valid http response"),
// })
// }

fn blocks(
query: &Query,
Expand Down