-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[qs] another refactor #7045
[qs] another refactor #7045
Changes from all commits
52a15b9
28a064c
bf87b15
a02e820
6807fde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,10 +174,9 @@ impl NetworkSender { | |
|
||
/// Tries to send the given msg to all the participants. | ||
/// | ||
/// The future is fulfilled as soon as the message put into the mpsc channel to network | ||
/// internal(to provide back pressure), it does not indicate the message is delivered or sent | ||
/// out. It does not give indication about when the message is delivered to the recipients, | ||
/// as well as there is no indication about the network failures. | ||
/// The future is fulfilled as soon as the message is put into the mpsc channel to network | ||
/// internal (to provide back pressure), it does not indicate the message is delivered or sent | ||
/// out. | ||
async fn broadcast(&mut self, msg: ConsensusMsg) { | ||
fail_point!("consensus::send::any", |_| ()); | ||
// Directly send the message to ourself without going through network. | ||
|
@@ -186,13 +185,6 @@ impl NetworkSender { | |
error!("Error broadcasting to self: {:?}", err); | ||
} | ||
|
||
self.broadcast_without_self(msg).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you didn't touch it, but if you get a chance could you update the function comment, "internal(" needs space, "message put" probably message is put or sent, and in general can probably be expressed more concisely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should delete this anyway |
||
} | ||
|
||
/// Tries to send the given msg to all the participants, excluding self. | ||
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) { | ||
fail_point!("consensus::send::any", |_| ()); | ||
|
||
// Get the list of validators excluding our own account address. Note the | ||
// ordering is not important in this case. | ||
let self_author = self.author; | ||
|
@@ -362,13 +354,13 @@ impl QuorumStoreSender for NetworkSender { | |
async fn broadcast_fragment(&mut self, fragment: Fragment) { | ||
fail_point!("consensus::send::broadcast_fragment", |_| ()); | ||
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment)); | ||
self.broadcast_without_self(msg).await | ||
self.broadcast(msg).await | ||
} | ||
|
||
async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { | ||
fail_point!("consensus::send::proof_of_store", |_| ()); | ||
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)); | ||
self.broadcast_without_self(msg).await | ||
self.broadcast(msg).await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we can delete the broadcast_without_self method since this was the only use case. |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,10 @@ | |
|
||
use crate::{ | ||
network::NetworkSender, | ||
quorum_store::{batch_store::BatchStore, quorum_store_coordinator::CoordinatorCommand}, | ||
quorum_store::{ | ||
batch_store::{BatchReader, BatchStore}, | ||
quorum_store_coordinator::CoordinatorCommand, | ||
}, | ||
}; | ||
use aptos_consensus_types::{ | ||
block::Block, | ||
|
@@ -12,7 +15,6 @@ use aptos_consensus_types::{ | |
}; | ||
use aptos_crypto::HashValue; | ||
use aptos_executor_types::{Error::DataNotFound, *}; | ||
use aptos_infallible::Mutex; | ||
use aptos_logger::prelude::*; | ||
use aptos_types::transaction::SignedTransaction; | ||
use futures::{channel::mpsc::Sender, SinkExt}; | ||
|
@@ -23,10 +25,7 @@ use tokio::sync::oneshot; | |
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload. | ||
pub enum PayloadManager { | ||
DirectMempool, | ||
InQuorumStore( | ||
Arc<BatchStore<NetworkSender>>, | ||
Mutex<Sender<CoordinatorCommand>>, | ||
), | ||
InQuorumStore(Arc<BatchStore<NetworkSender>>, Sender<CoordinatorCommand>), | ||
} | ||
|
||
impl PayloadManager { | ||
|
@@ -60,6 +59,7 @@ impl PayloadManager { | |
match self { | ||
PayloadManager::DirectMempool => {}, | ||
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => { | ||
// TODO: move this to somewhere in quorum store, so this can be a batch reader | ||
batch_store.update_certified_round(logical_time).await; | ||
|
||
let digests: Vec<HashValue> = payloads | ||
|
@@ -73,9 +73,8 @@ impl PayloadManager { | |
.map(|proof| *proof.digest()) | ||
.collect(); | ||
|
||
let mut tx = coordinator_tx.lock().clone(); | ||
let mut tx = coordinator_tx.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the TODO below deprecated? |
||
|
||
// TODO: don't even need to warn on fail? | ||
if let Err(e) = tx | ||
.send(CoordinatorCommand::CommitNotification( | ||
logical_time, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,8 @@ | |
#![allow(dead_code)] | ||
#![allow(unused_variables)] | ||
|
||
use crate::quorum_store::{ | ||
counters, | ||
types::{BatchId, SerializedTransaction}, | ||
}; | ||
use crate::quorum_store::{counters, types::SerializedTransaction}; | ||
use aptos_consensus_types::proof_of_store::BatchId; | ||
use aptos_crypto::{hash::DefaultHasher, HashValue}; | ||
use aptos_logger::{error, warn}; | ||
use aptos_types::transaction::SignedTransaction; | ||
|
@@ -145,8 +143,8 @@ impl BatchAggregator { | |
if Self::is_new_batch(batch_id, self_batch_id) { | ||
self.batch_state.is_some() || fragment_id > 0 | ||
} else { | ||
assert!( | ||
batch_id == self_batch_id, | ||
assert_eq!( | ||
batch_id, self_batch_id, | ||
"Missed fragment called with an outdated fragment" | ||
); | ||
fragment_id > self.next_fragment_id() | ||
|
@@ -207,11 +205,8 @@ impl BatchAggregator { | |
self.batch_state = Some(IncrementalBatchState::new(self.max_batch_bytes)); | ||
} | ||
|
||
if self.batch_state.is_some() { | ||
self.batch_state | ||
.as_mut() | ||
.unwrap() | ||
.append_transactions(transactions)? | ||
if let Some(batch_state) = &mut self.batch_state { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ugh, thanks for showing the correct way in the code we will delete shortly anyway ;-) |
||
batch_state.append_transactions(transactions)? | ||
} | ||
Ok(()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool!