-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Quorum Store] Implementation of quorum store components #6055
Conversation
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
✅ Forge suite
|
✅ Forge suite
|
✅ Forge suite
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall, looks good! I've made bunch of comments/questions/suggestions inline, but even if you want to address any of those, you can do so in the followup PR, I am accepting as this one is huge, and there is nothing important that is off.
@@ -48,10 +49,10 @@ pub struct ChainHealthBackoffValues { | |||
impl Default for ConsensusConfig { | |||
fn default() -> ConsensusConfig { | |||
ConsensusConfig { | |||
max_sending_block_txns: 2500, | |||
max_sending_block_txns: 4000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, we cannot really change these now.
either :
- keep them as is, and then increase in the next release once QS is enabled in main
- or have new fields (qs_max_sending_block_txns, and others) in the interrim, before the cleanup)
|
||
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] | ||
#[serde(default, deny_unknown_fields)] | ||
pub struct QuorumStoreConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some more comments what are the constants and what are they referring to? for example what channel is "channel_size" here
@@ -10,8 +10,7 @@ use aptos_crypto::HashValue; | |||
use futures::channel::oneshot; | |||
use std::{fmt, fmt::Formatter}; | |||
|
|||
/// Message sent from Consensus to QuorumStore. | |||
pub enum PayloadRequest { | |||
pub enum BlockProposalCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be "GetBlockProposalCommand" ?
proposal generator issues this, and then it proposes there. this command prepares the proposal only?
@@ -302,6 +304,12 @@ impl NetworkSender { | |||
|
|||
#[async_trait::async_trait] | |||
impl QuorumStoreSender for NetworkSender { | |||
async fn send_batch_request(&self, request: BatchRequest, recipients: Vec<Author>) { | |||
fail_point!("consensus::send_batch_request", |_| ()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the same convention :
consensus::send_batch_request => consensus::send::batch_request
and for consensus::send_batch and consensus::send_signed_digest below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! #6650
|
||
pub enum QuorumStoreBuilder { | ||
DirectMempool(DirectMempoolInnerBuilder), | ||
InQuorumStore(InnerBuilder), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "In" here refer to?
{ | ||
let batch_coordinator = BatchCoordinator::new( | ||
self.epoch, | ||
self.author, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I maybe missing something, but shouldn't remote batch coordinator be creating batches authored by a remote node , not local peer id? Or is this used for something else?
for (i, remote_batch_coordinator_cmd_rx) in | ||
self.remote_batch_coordinator_cmd_rx.into_iter().enumerate() | ||
{ | ||
let batch_coordinator = BatchCoordinator::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for having separate batch coordinator for each "batch author"?
Is it because we want to process each stream independently (i.e. have 100 receiver loops), or because it makes BatchCoordinator code cleaner to care about the single author? If the second, should we have a single channel, and have a dispatcher to give the message to appropriate coordinator, instead of having 100 loops?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create batch coordinator workers == num_workers_for_remote_fragments
(which can be smaller than the number of peers).
The reason is closer to the former. Most importantly, we want the remote fragments not to block the local fragments (which is even accomplished with == 1
). Then we want the remote fragments to not block each other.
digest_to_proof: HashMap<HashValue, IncrementalProofState>, | ||
digest_to_time: HashMap<HashValue, u64>, | ||
// to record the batch creation time | ||
timeouts: DigestTimeouts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this confused me, these is not recording what has timeouted, but at what point in the future something will expire
maybe expirations is better?
|
||
fn expire(&mut self) { | ||
for digest in self.timeouts.expire() { | ||
if let Some(state) = self.digest_to_proof.remove(&digest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment:
// check if proof hasn't completed already
as that is the reason for there not being a value
self.remote_batch_coordinator_tx.len(), | ||
idx | ||
); | ||
self.remote_batch_coordinator_tx[idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does the remote_batch_coordinator receive EndBatch?
also if processing remote and local fragments is different, why don't we have two classes - LocalBatchCoordinator and RemoteBatchCoordinator, if they have no overlap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zekun000 had the same comment. I'll create an item to work on this.
### Description Based on feedback in #6055
Description
Implementation of Quorum Store. See component diagram at https://drive.google.com/file/d/1Vu3G_z6zOueljBnnPLZp4VIZp4Oo_AwQ/view?usp=sharing
Quorum store is still disabled by default (via onchain config).
Test Plan
All tests pass without quorum store enabled.
With quorum store enabled (by hard-coding the onchain config):
Before enabling quorum store, we additionally need to: