Skip to content

Commit

Permalink
fix deadlock between the mempool and the db (#15421)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Nov 27, 2024
1 parent d0410ec commit de9040d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
29 changes: 15 additions & 14 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
use_case_history::UseCaseHistory,
},
thread_pool::IO_POOL,
thread_pool::{IO_POOL, VALIDATION_POOL},
QuorumStoreRequest, QuorumStoreResponse, SubmissionStatus,
};
use anyhow::Result;
Expand Down Expand Up @@ -45,7 +45,6 @@ use std::{
time::{Duration, Instant},
};
use tokio::runtime::Handle;

// ============================== //
// broadcast_coordinator tasks //
// ============================== //
Expand Down Expand Up @@ -393,18 +392,20 @@ fn validate_and_add_transactions<NetworkClient, TransactionValidator>(
let vm_validation_timer = counters::PROCESS_TXN_BREAKDOWN_LATENCY
.with_label_values(&[counters::VM_VALIDATION_LABEL])
.start_timer();
let validation_results = transactions
.par_iter()
.map(|t| {
let result = smp.validator.read().validate_transaction(t.0.clone());
// Pre-compute the hash and length if the transaction is valid, before locking mempool
if result.is_ok() {
t.0.committed_hash();
t.0.txn_bytes_len();
}
result
})
.collect::<Vec<_>>();
let validation_results = VALIDATION_POOL.install(|| {
transactions
.par_iter()
.map(|t| {
let result = smp.validator.read().validate_transaction(t.0.clone());
// Pre-compute the hash and length if the transaction is valid, before locking mempool
if result.is_ok() {
t.0.committed_hash();
t.0.txn_bytes_len();
}
result
})
.collect::<Vec<_>>()
});
vm_validation_timer.stop_and_record();
{
let mut mempool = smp.mempool.lock();
Expand Down
7 changes: 7 additions & 0 deletions mempool/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ pub(crate) static IO_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
.build()
.unwrap()
});

pub(crate) static VALIDATION_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
rayon::ThreadPoolBuilder::new()
.thread_name(|index| format!("mempool_vali_{}", index))
.build()
.unwrap()
});
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ impl ShardedStateUpdates {
}

pub fn merge(&mut self, other: Self) {
self.shards
.par_iter_mut()
.zip_eq(other.shards.into_par_iter())
.for_each(|(l, r)| {
l.extend(r);
})
THREAD_MANAGER.get_exe_cpu_pool().install(|| {
self.shards
.par_iter_mut()
.zip_eq(other.shards.into_par_iter())
.for_each(|(l, r)| {
l.extend(r);
})
})
}

pub fn clone_merge(&mut self, other: &Self) {
Expand Down

0 comments on commit de9040d

Please sign in to comment.