Skip to content

Commit

Permalink
[qs] remove fragments and refactor types around batch
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Mar 16, 2023
1 parent 890700c commit cf2c5e4
Show file tree
Hide file tree
Showing 32 changed files with 596 additions and 1,576 deletions.
9 changes: 3 additions & 6 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub struct QuorumStoreConfig {
pub batch_generation_poll_interval_ms: usize,
pub batch_generation_min_non_empty_interval_ms: usize,
pub batch_generation_max_interval_ms: usize,
pub end_batch_ms: u64,
pub max_batch_bytes: usize,
pub batch_request_timeout_ms: usize,
/// Used when setting up the expiration time for the batch initation.
Expand All @@ -60,7 +59,7 @@ pub struct QuorumStoreConfig {
pub batch_quota: usize,
pub mempool_txn_pull_max_bytes: u64,
pub back_pressure: QuorumStoreBackPressureConfig,
pub num_workers_for_remote_fragments: usize,
pub num_workers_for_remote_batches: usize,
}

impl Default for QuorumStoreConfig {
Expand All @@ -72,8 +71,6 @@ impl Default for QuorumStoreConfig {
batch_generation_poll_interval_ms: 25,
batch_generation_min_non_empty_interval_ms: 100,
batch_generation_max_interval_ms: 250,
// TODO: This essentially turns fragments off, because there was performance degradation. Needs more investigation.
end_batch_ms: 10,
max_batch_bytes: 4 * 1024 * 1024,
batch_request_timeout_ms: 10000,
batch_expiry_round_gap_when_init: 100,
Expand All @@ -85,8 +82,8 @@ impl Default for QuorumStoreConfig {
batch_quota: 300_000,
mempool_txn_pull_max_bytes: 4 * 1024 * 1024,
back_pressure: QuorumStoreBackPressureConfig::default(),
// number of batch coordinators to handle QS Fragment messages, should be >= 1
num_workers_for_remote_fragments: 10,
// number of batch coordinators to handle QS batch messages, should be >= 1
num_workers_for_remote_batches: 10,
}
}
}
60 changes: 24 additions & 36 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,68 +86,56 @@ impl Display for BatchId {
#[derive(
Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash,
)]
pub struct SignedDigestInfo {
pub batch_author: PeerId,
pub struct BatchInfo {
pub author: PeerId,
pub batch_id: BatchId,
pub digest: HashValue,
pub expiration: LogicalTime,
pub digest: HashValue,
pub num_txns: u64,
pub num_bytes: u64,
}

impl SignedDigestInfo {
impl BatchInfo {
pub fn new(
batch_author: PeerId,
author: PeerId,
batch_id: BatchId,
digest: HashValue,
expiration: LogicalTime,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
batch_author,
author,
batch_id,
digest,
expiration,
digest,
num_txns,
num_bytes,
}
}

pub fn epoch(&self) -> u64 {
self.expiration.epoch
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SignedDigest {
epoch: u64,
pub struct SignedBatchInfo {
signer: PeerId,
info: SignedDigestInfo,
info: BatchInfo,
signature: bls12381::Signature,
}

impl SignedDigest {
impl SignedBatchInfo {
pub fn new(
batch_author: PeerId,
batch_id: BatchId,
epoch: u64,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
batch_info: BatchInfo,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(
batch_author,
batch_id,
digest,
expiration,
num_txns,
num_bytes,
);
let signature = validator_signer.sign(&info)?;
let signature = validator_signer.sign(&batch_info)?;

Ok(Self {
epoch,
signer: validator_signer.author(),
info,
info: batch_info,
signature,
})
}
Expand All @@ -157,7 +145,7 @@ impl SignedDigest {
}

pub fn epoch(&self) -> u64 {
self.epoch
self.info.epoch()
}

pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Expand All @@ -168,7 +156,7 @@ impl SignedDigest {
}
}

pub fn info(&self) -> &SignedDigestInfo {
pub fn info(&self) -> &BatchInfo {
&self.info
}

Expand All @@ -182,7 +170,7 @@ impl SignedDigest {
}

#[derive(Debug, PartialEq)]
pub enum SignedDigestError {
pub enum SignedBatchInfoError {
WrongAuthor,
WrongInfo,
DuplicatedSignature,
Expand All @@ -191,19 +179,19 @@ pub enum SignedDigestError {

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
pub struct ProofOfStore {
info: SignedDigestInfo,
info: BatchInfo,
multi_signature: AggregateSignature,
}

impl ProofOfStore {
pub fn new(info: SignedDigestInfo, multi_signature: AggregateSignature) -> Self {
pub fn new(info: BatchInfo, multi_signature: AggregateSignature) -> Self {
Self {
info,
multi_signature,
}
}

pub fn info(&self) -> &SignedDigestInfo {
pub fn info(&self) -> &BatchInfo {
&self.info
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub fn start_consensus(
let (self_sender, self_receiver) = aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES);

let consensus_network_client = ConsensusNetworkClient::new(network_client);
let bounded_executor = BoundedExecutor::new(4, runtime.handle().clone());
let bounded_executor = BoundedExecutor::new(8, runtime.handle().clone());
let epoch_mgr = EpochManager::new(
node_config,
time_service,
Expand Down
13 changes: 6 additions & 7 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,10 +894,9 @@ impl EpochManager {
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::CommitVoteMsg(_)
| ConsensusMsg::CommitDecisionMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::SignedBatchInfo(_)
| ConsensusMsg::ProofOfStoreMsg(_) => {
let event: UnverifiedEvent = msg.into();
if event.epoch() == self.epoch() {
Expand Down Expand Up @@ -950,8 +949,8 @@ impl EpochManager {
event: &UnverifiedEvent,
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedBatchInfo(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Ok(())
Expand Down Expand Up @@ -994,9 +993,9 @@ impl EpochManager {
);
}
if let Err(e) = match event {
quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_)
quorum_store_event @ (VerifiedEvent::SignedBatchInfo(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
| VerifiedEvent::BatchMsg(_)) => {
Self::forward_event_to(quorum_store_msg_tx, peer_id, quorum_store_event)
.context("quorum store sender")
},
Expand Down
43 changes: 27 additions & 16 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
logging::LogEvent,
monitor,
network_interface::{ConsensusMsg, ConsensusNetworkClient},
quorum_store::types::{Batch, BatchRequest, Fragment},
quorum_store::types::{Batch, BatchMsg, BatchRequest},
};
use anyhow::{anyhow, ensure};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
Expand All @@ -17,7 +17,7 @@ use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse, MAX_BLOCKS_PER_REQUEST},
common::Author,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStore, SignedDigest},
proof_of_store::{ProofOfStore, SignedBatchInfo},
proposal_msg::ProposalMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
Expand Down Expand Up @@ -94,9 +94,13 @@ pub trait QuorumStoreSender: Send + Clone {

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);
async fn send_signed_batch_info(
&self,
signed_batch_info: SignedBatchInfo,
recipients: Vec<Author>,
);

async fn broadcast_fragment(&mut self, fragment: Fragment);
async fn broadcast_batch_msg(&mut self, batch: Batch);

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore);
}
Expand Down Expand Up @@ -334,26 +338,33 @@ impl QuorumStoreSender for NetworkSender {
.send_rpc(recipient, msg, timeout)
.await?;
match response {
ConsensusMsg::BatchMsg(batch) => Ok(*batch),
ConsensusMsg::BatchResponse(batch) => {
batch.verify()?;
Ok(*batch)
},
_ => Err(anyhow!("Invalid batch response")),
}
}

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send::batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
let msg = ConsensusMsg::BatchResponse(Box::new(batch));
self.send(msg, recipients).await
}

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>) {
fail_point!("consensus::send::signed_digest", |_| ());
let msg = ConsensusMsg::SignedDigestMsg(Box::new(signed_digest));
async fn send_signed_batch_info(
&self,
signed_batch_info: SignedBatchInfo,
recipients: Vec<Author>,
) {
fail_point!("consensus::send::signed_batch_info", |_| ());
let msg = ConsensusMsg::SignedBatchInfo(Box::new(signed_batch_info));
self.send(msg, recipients).await
}

async fn broadcast_fragment(&mut self, fragment: Fragment) {
fail_point!("consensus::send::broadcast_fragment", |_| ());
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment));
async fn broadcast_batch_msg(&mut self, batch: Batch) {
fail_point!("consensus::send::broadcast_batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(BatchMsg::new(batch)));
self.broadcast(msg).await
}

Expand Down Expand Up @@ -446,11 +457,11 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
match msg {
ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => {
warn!("unexpected msg");
ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchResponse(_) => {
warn!("unexpected rpc msg");
},
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
quorum_store_msg @ (ConsensusMsg::SignedBatchInfo(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
Expand Down
22 changes: 10 additions & 12 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

//! Interface between Consensus and Network layers.
use crate::quorum_store::types::{Batch, BatchRequest, Fragment};
use crate::quorum_store::types::{Batch, BatchMsg, BatchRequest};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
epoch_retrieval::EpochRetrievalRequest,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStore, SignedDigest},
proof_of_store::{ProofOfStore, SignedBatchInfo},
proposal_msg::ProposalMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
Expand Down Expand Up @@ -50,17 +50,15 @@ pub enum ConsensusMsg {
/// than 2f + 1 signatures on the commit proposal. This part is not on the critical path, but
/// it can save slow machines to quickly confirm the execution result.
CommitDecisionMsg(Box<CommitDecision>),
/// Quorum Store: Send a fragment -- a sequence of transactions that are part of an in-progress
/// batch -- from the fragment generator to remote validators.
FragmentMsg(Box<Fragment>),
/// Quorum Store: Send a Batch of transactions.
BatchMsg(Box<BatchMsg>),
/// Quorum Store: Request the payloads of a completed batch.
BatchRequestMsg(Box<BatchRequest>),
/// Quorum Store: Respond with a completed batch's payload -- a sequence of transactions,
/// identified by its digest.
BatchMsg(Box<Batch>),
/// Quorum Store: Response to the batch request.
BatchResponse(Box<Batch>),
/// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that
/// the batch of transactions was received and will be persisted until batch expiration.
SignedDigestMsg(Box<SignedDigest>),
SignedBatchInfo(Box<SignedBatchInfo>),
/// Quorum Store: Broadcast a certified proof of store (a digest that received 2f+1 votes).
ProofOfStoreMsg(Box<ProofOfStore>),
}
Expand All @@ -80,10 +78,10 @@ impl ConsensusMsg {
ConsensusMsg::VoteMsg(_) => "VoteMsg",
ConsensusMsg::CommitVoteMsg(_) => "CommitVoteMsg",
ConsensusMsg::CommitDecisionMsg(_) => "CommitDecisionMsg",
ConsensusMsg::FragmentMsg(_) => "FragmentMsg",
ConsensusMsg::BatchRequestMsg(_) => "BatchRequestMsg",
ConsensusMsg::BatchMsg(_) => "BatchMsg",
ConsensusMsg::SignedDigestMsg(_) => "SignedDigestMsg",
ConsensusMsg::BatchRequestMsg(_) => "BatchRequestMsg",
ConsensusMsg::BatchResponse(_) => "BatchResponse",
ConsensusMsg::SignedBatchInfo(_) => "SignedBatchInfo",
ConsensusMsg::ProofOfStoreMsg(_) => "ProofOfStoreMsg",
}
}
Expand Down
Loading

0 comments on commit cf2c5e4

Please sign in to comment.