Skip to content

Commit

Permalink
fix: handle out of sync errors when returning mempool transactions (t…
Browse files Browse the repository at this point in the history
…ari-project#5701)

Description
---
Add a new error type and gracefully recover from possible runtime panic.

Motivation and Context
---
Currently, there is a minimal possibility that the collection of tx's is
out sync. If this occurs the base node will likely crash.

How Has This Been Tested?
---
CI

Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify
  • Loading branch information
brianp authored Aug 31, 2023
1 parent 5e4f3c2 commit b0337cf
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
2 changes: 2 additions & 0 deletions base_layer/core/src/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ pub enum MempoolError {
BlockingTaskError(#[from] JoinError),
#[error("Internal error: {0}")]
InternalError(String),
#[error("Mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")]
IndexOutOfSync,
}
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl Mempool {
&self,
excess_sigs: Vec<PrivateKey>,
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
self.with_read_access(move |storage| Ok(storage.retrieve_by_excess_sigs(&excess_sigs)))
self.with_read_access(move |storage| storage.retrieve_by_excess_sigs(&excess_sigs))
.await
}

Expand Down
19 changes: 12 additions & 7 deletions base_layer/core/src/mempool/mempool_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,19 @@ impl MempoolStorage {
Ok(results.retrieved_transactions)
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
let (found_txns, remaining) = self.unconfirmed_pool.retrieve_by_excess_sigs(excess_sigs);
let (found_published_transactions, remaining) = self.reorg_pool.retrieve_by_excess_sigs(&remaining);
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
let (found_txns, remaining) = self.unconfirmed_pool.retrieve_by_excess_sigs(excess_sigs)?;

(
found_txns.into_iter().chain(found_published_transactions).collect(),
remaining,
)
match self.reorg_pool.retrieve_by_excess_sigs(&remaining) {
Ok((found_published_transactions, remaining)) => Ok((
found_txns.into_iter().chain(found_published_transactions).collect(),
remaining,
)),
Err(e) => Err(e),
}
}

/// Check if the specified excess signature is found in the Mempool.
Expand Down
20 changes: 9 additions & 11 deletions base_layer/core/src/mempool/reorg_pool/reorg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tari_utilities::hex::Hex;

use crate::{
blocks::Block,
mempool::shrink_hashmap::shrink_hashmap,
mempool::{shrink_hashmap::shrink_hashmap, MempoolError},
transactions::transaction_components::Transaction,
};

Expand Down Expand Up @@ -144,7 +144,10 @@ impl ReorgPool {
result
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
let mut remaining = Vec::new();
Expand All @@ -158,15 +161,10 @@ impl ReorgPool {

let found = found
.into_iter()
.map(|id| {
self.tx_by_key
.get(id)
.expect("mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")
})
.cloned()
.collect();

(found, remaining)
.map(|id| self.tx_by_key.get(id).cloned().ok_or(MempoolError::IndexOutOfSync))
.collect::<Result<Vec<_>, _>>()?;

Ok((found, remaining))
}

/// Check if a transaction is stored in the ReorgPool
Expand Down
12 changes: 8 additions & 4 deletions base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
shrink_hashmap::shrink_hashmap,
unconfirmed_pool::UnconfirmedPoolError,
FeePerGramStat,
MempoolError,
},
transactions::{tari_amount::MicroMinotari, transaction_components::Transaction, weight::TransactionWeight},
};
Expand Down Expand Up @@ -401,7 +402,10 @@ impl UnconfirmedPool {
}
}

pub fn retrieve_by_excess_sigs(&self, excess_sigs: &[PrivateKey]) -> (Vec<Arc<Transaction>>, Vec<PrivateKey>) {
pub fn retrieve_by_excess_sigs(
&self,
excess_sigs: &[PrivateKey],
) -> Result<(Vec<Arc<Transaction>>, Vec<PrivateKey>), MempoolError> {
// Hashset used to prevent duplicates
let mut found = HashSet::new();
let mut remaining = Vec::new();
Expand All @@ -419,11 +423,11 @@ impl UnconfirmedPool {
self.tx_by_key
.get(&id)
.map(|tx| tx.transaction.clone())
.expect("mempool indexes out of sync: transaction exists in txs_by_signature but not in tx_by_key")
.ok_or(MempoolError::IndexOutOfSync)
})
.collect();
.collect::<Result<Vec<_>, _>>()?;

(found, remaining)
Ok((found, remaining))
}

fn get_all_dependent_transactions(
Expand Down

0 comments on commit b0337cf

Please sign in to comment.