Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brian/test graceful overload #13856

Closed
wants to merge 74 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
60e8b87
Use proof queue asynchronously
vusirikala Jun 3, 2024
b56cc33
Committing what I have
vusirikala Jun 3, 2024
a369c28
Sending AddBatches message
vusirikala Jun 3, 2024
80a5a34
Calcuating the remaining txns
vusirikala Jun 3, 2024
4e5babe
Calculate proof queue size correctly
vusirikala Jun 4, 2024
1653475
Add a counter
vusirikala Jun 4, 2024
844e4ae
Update pfn_const_tps test
vusirikala Jun 4, 2024
be89d17
Minor changes
vusirikala Jun 4, 2024
8d77c78
Minor change
vusirikala Jun 5, 2024
e064652
Add some coutners
vusirikala Jun 5, 2024
204d823
Rust lint
vusirikala Jun 5, 2024
32eb5a4
Increasing quorum store backpressure limits
vusirikala Jun 5, 2024
7f63400
setting dynamic_min_txns_per_sec to 160
vusirikala Jun 5, 2024
196412f
Merge branch 'main' into satya/proof_queue
vusirikala Jun 6, 2024
76f9326
Fixing the calculation
vusirikala Jun 6, 2024
4c7af58
increase vfns to 7
vusirikala Jun 7, 2024
f71d2d0
Fixing the typo in batch generator
vusirikala Jun 7, 2024
2ee53a8
Merge branch 'main' into satya/proof_queue
vusirikala Jun 10, 2024
30490c3
Merge branch 'main' into satya/proof_queue
vusirikala Jun 11, 2024
e1dcb03
Merge branch 'main' into satya/proof_queue
vusirikala Jun 12, 2024
6282c44
Merge branch 'main' into satya/proof_queue
vusirikala Jun 13, 2024
29951a9
Add increase fraction
vusirikala Jun 13, 2024
cc3d7a2
Removing skipped transactions after inserting them
vusirikala Jun 13, 2024
5319f79
Add some counters
vusirikala Jun 13, 2024
a4f11ab
Update consensus pending duration counter
vusirikala Jun 13, 2024
eabe96c
Add more counters
vusirikala Jun 13, 2024
0c413fd
Increasing block size to 2500
vusirikala Jun 13, 2024
78b559b
Update a counter
vusirikala Jun 13, 2024
056baec
Increase block size limit
vusirikala Jun 13, 2024
6e54abb
Resetting execution config params
vusirikala Jun 14, 2024
685325a
Moving proof queue to utils.rs
vusirikala Jun 14, 2024
4a4d884
Moving counters
vusirikala Jun 14, 2024
a9871d8
Use transaction summary
vusirikala Jun 14, 2024
13669ca
intelligent pull proofs
vusirikala Jun 14, 2024
1264dad
Fix a bug in pull proofs
vusirikala Jun 15, 2024
5c38ea5
Merge branch 'main' into satya/proof_queue_simple
vusirikala Jun 15, 2024
244720c
Fix the bug
vusirikala Jun 15, 2024
7b0f5bc
Merge branch 'satya/proof_queue_simple' into satya/pull_proofs
vusirikala Jun 15, 2024
822746a
Rest to full to false in every iteration
vusirikala Jun 17, 2024
9982f97
Addressing PR comments
vusirikala Jun 17, 2024
3a23414
Move backpressure_tx to proof queue
vusirikala Jun 17, 2024
8c107be
Add info statement
vusirikala Jun 17, 2024
2f2e7a4
Merge branch 'satya/proof_queue_simple' into satya/pull_proofs
vusirikala Jun 17, 2024
75fabfe
Change buckets
vusirikala Jun 18, 2024
f20dd13
Add some info statements
vusirikala Jun 18, 2024
07aed02
Merge branch 'satya/proof_queue_simple' into satya/pull_proofs
vusirikala Jun 18, 2024
cb1b61d
Merge branch 'main' into satya/proof_queue_simple
vusirikala Jun 18, 2024
45b40bc
Merge branch 'main' into satya/proof_queue_simple
vusirikala Jun 18, 2024
51ccb9d
Cleanup
vusirikala Jun 19, 2024
9c76004
Remove an unrelated change
vusirikala Jun 19, 2024
21bb0ad
Addressing PR comments
vusirikala Jun 24, 2024
6eaafb2
Addressing PR comments
vusirikala Jun 25, 2024
46f18f3
Add some timer counters
vusirikala Jun 25, 2024
b0f73fd
Add more timer counters
vusirikala Jun 25, 2024
2bd92e2
Minor optimization
vusirikala Jun 25, 2024
51dc66b
Proof queue to be part of proof manager
vusirikala Jun 25, 2024
895cb79
Merge branch 'main' into satya/proof_queue_simple
vusirikala Jun 25, 2024
4a104c2
Move some code to a function
vusirikala Jun 25, 2024
bd3f611
Merge branch 'satya/proof_queue_simple' into satya/pull_proofs
vusirikala Jun 25, 2024
529e12a
Minor fixes
vusirikala Jun 26, 2024
f183611
Merge branch 'main' into satya/pull_proofs
vusirikala Jun 26, 2024
80aa8bf
Add max_unique_txns parameter
vusirikala Jun 26, 2024
5fce8e6
Use Lazy
vusirikala Jun 26, 2024
adbb6fc
Removing comments
vusirikala Jun 26, 2024
9964b60
Minor change
vusirikala Jun 26, 2024
0b170d3
Minor change
vusirikala Jun 26, 2024
d2f0fba
Minor fix
vusirikala Jun 26, 2024
13ec31b
Add unit test and address PR comments
vusirikala Jun 26, 2024
c428b1c
Minor fix in proof manager
vusirikala Jun 27, 2024
1d9b169
Merge branch 'main' into satya/pull_proofs
vusirikala Jun 27, 2024
2712880
Use saturating_sub
vusirikala Jun 27, 2024
0a50de4
Exclude expired transactions when counting block size
vusirikala Jun 27, 2024
3f20b98
Revert "Revert quorum store] reduce backpressure significantly for mo…
bchocho Jun 28, 2024
2878fc9
run graceful overload
bchocho Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move backpressure_tx to proof queue
vusirikala committed Jun 17, 2024
commit 3a234140c75c08b2900e6cd37ae64d0415789d58
78 changes: 9 additions & 69 deletions consensus/src/quorum_store/proof_manager.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
use crate::{
monitor,
quorum_store::{
batch_generator::BackPressure,
batch_store::BatchStore,
counters,
utils::{BatchSortKey, ProofQueueCommand},
@@ -131,28 +130,18 @@ impl BatchQueue {

pub struct ProofManager {
batch_queue: BatchQueue,
back_pressure_total_txn_limit: u64,
remaining_total_txn_num: u64,
back_pressure_total_proof_limit: u64,
remaining_total_proof_num: u64,
allow_batches_without_pos_in_proposal: bool,
proof_queue_tx: Arc<Sender<ProofQueueCommand>>,
}

impl ProofManager {
pub fn new(
back_pressure_total_txn_limit: u64,
back_pressure_total_proof_limit: u64,
batch_store: Arc<BatchStore>,
allow_batches_without_pos_in_proposal: bool,
proof_queue_tx: Arc<Sender<ProofQueueCommand>>,
) -> Self {
Self {
batch_queue: BatchQueue::new(batch_store),
back_pressure_total_txn_limit,
remaining_total_txn_num: 0,
back_pressure_total_proof_limit,
remaining_total_proof_num: 0,
allow_batches_without_pos_in_proposal,
proof_queue_tx,
}
@@ -163,22 +152,12 @@ impl ProofManager {
self.batch_queue.remove_batch(proof.info());
}
if !proofs.is_empty() {
let (response_tx, response_rx) = oneshot::channel();
if self
if let Err(e) = self
.proof_queue_tx
.send(ProofQueueCommand::AddProofs(proofs, response_tx))
.send(ProofQueueCommand::AddProofs(proofs))
.await
.is_ok()
{
if let Ok((remaining_total_txn_num, remaining_total_proof_num)) = response_rx.await
{
self.remaining_total_txn_num = remaining_total_txn_num;
self.remaining_total_proof_num = remaining_total_proof_num;
} else {
warn!("Failed to get response from proof queue after adding proofs");
}
} else {
warn!("Failed to add proofs to proof queue");
warn!("Failed to add proofs to proof queue with error: {:?}", e);
}
}
}
@@ -203,25 +182,15 @@ impl ProofManager {
self.batch_queue.remove_batch(batch);
}

let (response_tx, response_rx) = oneshot::channel();
if self
if let Err(e) = self
.proof_queue_tx
.send(ProofQueueCommand::MarkCommitted(
batches,
block_timestamp,
response_tx,
))
.send(ProofQueueCommand::MarkCommitted(batches, block_timestamp))
.await
.is_ok()
{
if let Ok((remaining_total_txn_num, remaining_total_proof_num)) = response_rx.await {
self.remaining_total_txn_num = remaining_total_txn_num;
self.remaining_total_proof_num = remaining_total_proof_num;
} else {
warn!("Failed to get response from proof queue after marking proofs as committed");
}
} else {
warn!("Failed to mark proofs as committed in proof queue");
warn!(
"Failed to mark proofs as committed in proof queue with error: {:?}",
e
);
}
}

@@ -330,39 +299,17 @@ impl ProofManager {
}
}

/// return true when quorum store is back pressured
pub(crate) fn qs_back_pressure(&self) -> BackPressure {
BackPressure {
txn_count: self.remaining_total_txn_num > self.back_pressure_total_txn_limit,
proof_count: self.remaining_total_proof_num > self.back_pressure_total_proof_limit,
}
}

pub async fn start(
mut self,
back_pressure_tx: Sender<BackPressure>,
mut proposal_rx: Receiver<GetPayloadCommand>,
mut proof_rx: tokio::sync::mpsc::Receiver<ProofManagerCommand>,
) {
let mut back_pressure = BackPressure {
txn_count: false,
proof_count: false,
};

loop {
let _timer = counters::PROOF_MANAGER_MAIN_LOOP.start_timer();

tokio::select! {
Some(msg) = proposal_rx.next() => monitor!("proof_manager_handle_proposal", {
self.handle_proposal_request(msg).await;

let updated_back_pressure = self.qs_back_pressure();
if updated_back_pressure != back_pressure {
back_pressure = updated_back_pressure;
if back_pressure_tx.send(back_pressure).await.is_err() {
debug!("Failed to send back_pressure for proposal");
}
}
}),
Some(msg) = proof_rx.recv() => {
monitor!("proof_manager_handle_command", {
@@ -386,13 +333,6 @@ impl ProofManager {
).await;
},
}
let updated_back_pressure = self.qs_back_pressure();
if updated_back_pressure != back_pressure {
back_pressure = updated_back_pressure;
if back_pressure_tx.send(back_pressure).await.is_err() {
debug!("Failed to send back_pressure for commit notification");
}
}
})
}
}
20 changes: 12 additions & 8 deletions consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
@@ -320,9 +320,19 @@ impl InnerBuilder {
)
);

let proof_queue = ProofQueue::new(self.author);
let proof_queue = ProofQueue::new(
self.author,
self.config.back_pressure.backlog_txn_limit_count,
self.config
.back_pressure
.backlog_per_validator_batch_limit_count
* self.num_validators,
);
let proof_queue_cmd_rx = self.proof_queue_cmd_rx.take().unwrap();
spawn_named!("proof_queue", proof_queue.start(proof_queue_cmd_rx));
spawn_named!(
"proof_queue",
proof_queue.start(self.back_pressure_tx.clone(), proof_queue_cmd_rx)
);

for (i, remote_batch_coordinator_cmd_rx) in
self.remote_batch_coordinator_cmd_rx.into_iter().enumerate()
@@ -367,19 +377,13 @@ impl InnerBuilder {

let proof_manager_cmd_rx = self.proof_manager_cmd_rx.take().unwrap();
let proof_manager = ProofManager::new(
self.config.back_pressure.backlog_txn_limit_count,
self.config
.back_pressure
.backlog_per_validator_batch_limit_count
* self.num_validators,
self.batch_store.clone().unwrap(),
self.config.allow_batches_without_pos_in_proposal,
self.proof_queue_cmd_tx.clone(),
);
spawn_named!(
"proof_manager",
proof_manager.start(
self.back_pressure_tx.clone(),
self.consensus_to_quorum_store_receiver,
proof_manager_cmd_rx,
)
7 changes: 4 additions & 3 deletions consensus/src/quorum_store/tests/proof_manager_test.rs
Original file line number Diff line number Diff line change
@@ -16,10 +16,11 @@ use std::{collections::HashSet, sync::Arc};

async fn create_proof_manager() -> ProofManager {
let (proof_queue_tx, proof_queue_rx) = tokio::sync::mpsc::channel(100);
let proof_queue = ProofQueue::new(PeerId::random());
tokio::spawn(proof_queue.start(proof_queue_rx));
let proof_queue = ProofQueue::new(PeerId::random(), 10, 10);
let (backpressure_tx, _) = tokio::sync::mpsc::channel(10);
tokio::spawn(proof_queue.start(backpressure_tx, proof_queue_rx));
let batch_store = batch_store_for_test(5 * 1024 * 1024);
ProofManager::new(10, 10, batch_store, true, Arc::new(proof_queue_tx))
ProofManager::new(batch_store, true, Arc::new(proof_queue_tx))
}

fn create_proof(author: PeerId, expiration: u64, batch_sequence: u64) -> ProofOfStore {
2 changes: 1 addition & 1 deletion consensus/src/quorum_store/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ fn proof_of_store(author: PeerId, batch_id: BatchId, gas_bucket_start: u64) -> P
#[test]
fn test_proof_queue_sorting() {
let my_peer_id = PeerId::random();
let mut proof_queue = ProofQueue::new(my_peer_id);
let mut proof_queue = ProofQueue::new(my_peer_id, 10, 10);

let author_0 = PeerId::random();
let author_1 = PeerId::random();
57 changes: 47 additions & 10 deletions consensus/src/quorum_store/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::batch_generator::BackPressure;
use crate::{monitor, quorum_store::counters};
use aptos_consensus_types::{
common::{TransactionInProgress, TransactionSummary},
@@ -196,7 +197,7 @@ impl Ord for BatchSortKey {
pub enum ProofQueueCommand {
// Proof manager sends this command to add the proofs to the proof queue
// We send back (remaining_txns, remaining_proofs) to the proof manager
AddProofs(Vec<ProofOfStore>, oneshot::Sender<(u64, u64)>),
AddProofs(Vec<ProofOfStore>),
// Batch coordinator sends this command to add the received batches to the proof queue.
// For each transaction, the proof queue stores the list of batches containing the transaction.
AddBatches(Vec<(BatchInfo, Vec<TransactionSummary>)>),
@@ -212,7 +213,7 @@ pub enum ProofQueueCommand {
// Proof manager sends this command to mark these batches as committed and
// update the block timestamp.
// We send back the (remaining_txns, remaining_proofs) to the proof manager
MarkCommitted(Vec<BatchInfo>, u64, oneshot::Sender<(u64, u64)>),
MarkCommitted(Vec<BatchInfo>, u64),
}

pub struct ProofQueue {
@@ -229,14 +230,20 @@ pub struct ProofQueue {
// Expiration index
expirations: TimeExpirations<BatchSortKey>,
latest_block_timestamp: u64,
back_pressure_total_txn_limit: u64,
back_pressure_total_proof_limit: u64,
remaining_txns_with_duplicates: u64,
remaining_proofs: u64,
remaining_local_txns: u64,
remaining_local_proofs: u64,
}

impl ProofQueue {
pub(crate) fn new(my_peer_id: PeerId) -> Self {
pub(crate) fn new(
my_peer_id: PeerId,
back_pressure_total_txn_limit: u64,
back_pressure_total_proof_limit: u64,
) -> Self {
Self {
my_peer_id,
author_to_batches: HashMap::new(),
@@ -245,6 +252,8 @@ impl ProofQueue {
batches_with_txn_summary: HashSet::new(),
expirations: TimeExpirations::new(),
latest_block_timestamp: 0,
back_pressure_total_txn_limit,
back_pressure_total_proof_limit,
remaining_txns_with_duplicates: 0,
remaining_proofs: 0,
remaining_local_txns: 0,
@@ -338,6 +347,15 @@ impl ProofQueue {
self.inc_remaining(&author, num_txns);
}

/// return true when quorum store is back pressured
pub(crate) fn qs_back_pressure(&self) -> BackPressure {
let (remaining_total_txn_num, remaining_total_proof_num) = self.remaining_txns_and_proofs();
BackPressure {
txn_count: remaining_total_txn_num > self.back_pressure_total_txn_limit,
proof_count: remaining_total_proof_num > self.back_pressure_total_proof_limit,
}
}

// gets excluded and iterates over the vector returning non excluded or expired entries.
// return the vector of pulled PoS, and the size of the remaining PoS
// The flag in the second return argument is true iff the entire proof queue is fully utilized
@@ -548,17 +566,31 @@ impl ProofQueue {
}
}

pub async fn start(mut self, mut command_rx: tokio::sync::mpsc::Receiver<ProofQueueCommand>) {
pub async fn start(
mut self,
back_pressure_tx: tokio::sync::mpsc::Sender<BackPressure>,
mut command_rx: tokio::sync::mpsc::Receiver<ProofQueueCommand>,
) {
let mut back_pressure = BackPressure {
txn_count: false,
proof_count: false,
};

loop {
let _timer = counters::PROOF_MANAGER_MAIN_LOOP.start_timer();
if let Some(msg) = command_rx.recv().await {
match msg {
ProofQueueCommand::AddProofs(proofs, response_sender) => {
ProofQueueCommand::AddProofs(proofs) => {
for proof in proofs {
self.push(proof);
}
if let Err(e) = response_sender.send(self.remaining_txns_and_proofs()) {
warn!("Failed to send response to AddProofs: {:?}", e);

let updated_back_pressure = self.qs_back_pressure();
if updated_back_pressure != back_pressure {
back_pressure = updated_back_pressure;
if back_pressure_tx.send(back_pressure).await.is_err() {
debug!("Failed to send back_pressure for proposal");
}
}
},
ProofQueueCommand::PullProofs {
@@ -578,11 +610,16 @@ impl ProofQueue {
warn!("Failed to send response to PullProofs: {:?}", e);
}
},
ProofQueueCommand::MarkCommitted(batches, block_timestamp, response_sender) => {
ProofQueueCommand::MarkCommitted(batches, block_timestamp) => {
self.mark_committed(batches);
self.handle_updated_block_timestamp(block_timestamp);
if let Err(e) = response_sender.send(self.remaining_txns_and_proofs()) {
error!("Failed to send response to MarkCommitted: {:?}", e);

let updated_back_pressure = self.qs_back_pressure();
if updated_back_pressure != back_pressure {
back_pressure = updated_back_pressure;
if back_pressure_tx.send(back_pressure).await.is_err() {
debug!("Failed to send back_pressure for proposal");
}
}
},
ProofQueueCommand::AddBatches(batch_summaries) => {