diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index 675a066637ba5..842052e05ec69 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -25,6 +25,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; +use tokio::sync::mpsc::Sender; #[derive(Debug)] pub enum ProofManagerCommand { @@ -135,7 +136,7 @@ pub struct ProofManager { back_pressure_total_proof_limit: u64, remaining_total_proof_num: u64, allow_batches_without_pos_in_proposal: bool, - proof_queue_tx: Arc>, + proof_queue_tx: Arc>, } impl ProofManager { @@ -144,7 +145,7 @@ impl ProofManager { back_pressure_total_proof_limit: u64, batch_store: Arc, allow_batches_without_pos_in_proposal: bool, - proof_queue_tx: Arc>, + proof_queue_tx: Arc>, ) -> Self { Self { batch_queue: BatchQueue::new(batch_store), @@ -158,11 +159,14 @@ impl ProofManager { } pub(crate) async fn receive_proofs(&mut self, proofs: Vec) { + for proof in &proofs { + self.batch_queue.remove_batch(proof.info()); + } if !proofs.is_empty() { let (response_tx, response_rx) = oneshot::channel(); if self .proof_queue_tx - .send(ProofQueueCommand::AddProofs(proofs.clone(), response_tx)) + .send(ProofQueueCommand::AddProofs(proofs, response_tx)) .await .is_ok() { @@ -177,9 +181,6 @@ impl ProofManager { warn!("Failed to add proofs to proof queue"); } } - for proof in proofs.into_iter() { - self.batch_queue.remove_batch(proof.info()); - } } pub(crate) fn receive_batches(&mut self, batches: Vec) { @@ -339,7 +340,7 @@ impl ProofManager { pub async fn start( mut self, - back_pressure_tx: tokio::sync::mpsc::Sender, + back_pressure_tx: Sender, mut proposal_rx: Receiver, mut proof_rx: tokio::sync::mpsc::Receiver, ) { diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index e534fe211c064..3ecfca0d7e6f2 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -273,7 +273,7 @@ impl ProofQueue { } fn remaining_txns_without_duplicates(&self) -> u64 { - // All the bath keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs. + // All the batch keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs. let unexpired_batch_keys = self .batch_to_proof .iter() @@ -355,7 +355,7 @@ impl ProofQueue { let mut cur_txns = 0; let mut excluded_txns = 0; let mut full = false; - + counters::PULL_PROOFS_MAX_TXNS.observe(max_txns as f64); counters::PULL_PROOFS_MAX_BYTES.observe(max_bytes as f64);