Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Jun 17, 2024
1 parent 822746a commit 9982f97
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
15 changes: 8 additions & 7 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};
use tokio::sync::mpsc::Sender;

#[derive(Debug)]
pub enum ProofManagerCommand {
Expand Down Expand Up @@ -135,7 +136,7 @@ pub struct ProofManager {
back_pressure_total_proof_limit: u64,
remaining_total_proof_num: u64,
allow_batches_without_pos_in_proposal: bool,
proof_queue_tx: Arc<tokio::sync::mpsc::Sender<ProofQueueCommand>>,
proof_queue_tx: Arc<Sender<ProofQueueCommand>>,
}

impl ProofManager {
Expand All @@ -144,7 +145,7 @@ impl ProofManager {
back_pressure_total_proof_limit: u64,
batch_store: Arc<BatchStore>,
allow_batches_without_pos_in_proposal: bool,
proof_queue_tx: Arc<tokio::sync::mpsc::Sender<ProofQueueCommand>>,
proof_queue_tx: Arc<Sender<ProofQueueCommand>>,
) -> Self {
Self {
batch_queue: BatchQueue::new(batch_store),
Expand All @@ -158,11 +159,14 @@ impl ProofManager {
}

pub(crate) async fn receive_proofs(&mut self, proofs: Vec<ProofOfStore>) {
for proof in &proofs {
self.batch_queue.remove_batch(proof.info());
}
if !proofs.is_empty() {
let (response_tx, response_rx) = oneshot::channel();
if self
.proof_queue_tx
.send(ProofQueueCommand::AddProofs(proofs.clone(), response_tx))
.send(ProofQueueCommand::AddProofs(proofs, response_tx))
.await
.is_ok()
{
Expand All @@ -177,9 +181,6 @@ impl ProofManager {
warn!("Failed to add proofs to proof queue");
}
}
for proof in proofs.into_iter() {
self.batch_queue.remove_batch(proof.info());
}
}

pub(crate) fn receive_batches(&mut self, batches: Vec<BatchInfo>) {
Expand Down Expand Up @@ -339,7 +340,7 @@ impl ProofManager {

pub async fn start(
mut self,
back_pressure_tx: tokio::sync::mpsc::Sender<BackPressure>,
back_pressure_tx: Sender<BackPressure>,
mut proposal_rx: Receiver<GetPayloadCommand>,
mut proof_rx: tokio::sync::mpsc::Receiver<ProofManagerCommand>,
) {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum_store/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl ProofQueue {
}

fn remaining_txns_without_duplicates(&self) -> u64 {
// All the bath keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs.
// All the batch keys for which batch_to_proof is not None. This is the set of unexpired and uncommitted proofs.
let unexpired_batch_keys = self
.batch_to_proof
.iter()
Expand Down Expand Up @@ -355,7 +355,7 @@ impl ProofQueue {
let mut cur_txns = 0;
let mut excluded_txns = 0;
let mut full = false;

counters::PULL_PROOFS_MAX_TXNS.observe(max_txns as f64);
counters::PULL_PROOFS_MAX_BYTES.observe(max_bytes as f64);

Expand Down

0 comments on commit 9982f97

Please sign in to comment.