Skip to content

Commit

Permalink
[cherry-pick] various improvement for consensus (#7343)
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 authored Mar 24, 2023
1 parent f0c9893 commit 88bb502
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-bitvec = { workspace = true }
aptos-bounded-executor = { workspace = true }
aptos-channels = { workspace = true }
aptos-config = { workspace = true }
aptos-consensus-notifications = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ async fn test_need_sync_for_ledger_info() {
assert!(block_store.need_sync_for_ledger_info(&ordered_too_far));

let committed_round_too_far =
block_store.commit_root().round() + block_store.back_pressure_limit * 2 + 1;
block_store.commit_root().round() + block_store.back_pressure_limit * 3 + 1;
let committed_too_far = create_ledger_info(committed_round_too_far);
assert!(block_store.need_sync_for_ledger_info(&committed_too_far));

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl BlockStore {
pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool {
(self.ordered_root().round() < li.commit_info().round()
&& !self.block_exists(li.commit_info().id()))
|| self.commit_root().round() + 2 * self.back_pressure_limit < li.commit_info().round()
|| self.commit_root().round() + 3 * self.back_pressure_limit < li.commit_info().round()
}

/// Checks if quorum certificate can be inserted in block store without RPC
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
txn_notifier::MempoolNotifier,
util::time_service::ClockTimeService,
};
use aptos_bounded_executor::BoundedExecutor;
use aptos_config::config::NodeConfig;
use aptos_consensus_notifications::ConsensusNotificationSender;
use aptos_event_notifications::ReconfigNotificationListener;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub fn start_consensus(
let (self_sender, self_receiver) = aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES);

let consensus_network_client = ConsensusNetworkClient::new(network_client);
let bounded_executor = BoundedExecutor::new(4, runtime.handle().clone());
let epoch_mgr = EpochManager::new(
node_config,
time_service,
Expand All @@ -67,6 +69,7 @@ pub fn start_consensus(
state_computer,
storage,
reconfig_events,
bounded_executor,
);

let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver);
Expand Down
139 changes: 87 additions & 52 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
util::time_service::TimeService,
};
use anyhow::{bail, ensure, Context};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, NodeConfig};
use aptos_consensus_types::{
Expand Down Expand Up @@ -78,6 +79,7 @@ use itertools::Itertools;
use std::{
cmp::Ordering,
collections::HashMap,
hash::Hash,
mem::{discriminant, Discriminant},
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -120,12 +122,13 @@ pub struct EpochManager {
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
round_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
epoch_state: Option<EpochState>,
epoch_state: Option<Arc<EpochState>>,
block_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>>,
quorum_store_storage_path: PathBuf,
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
bounded_executor: BoundedExecutor,
}

impl EpochManager {
Expand All @@ -139,6 +142,7 @@ impl EpochManager {
commit_state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
reconfig_events: ReconfigNotificationListener,
bounded_executor: BoundedExecutor,
) -> Self {
let author = node_config.validator_network.as_ref().unwrap().peer_id();
let config = node_config.consensus.clone();
Expand Down Expand Up @@ -167,6 +171,7 @@ impl EpochManager {
quorum_store_storage_path: node_config.storage.dir(),
quorum_store_msg_tx: None,
quorum_store_coordinator_tx: None,
bounded_executor,
}
}

Expand Down Expand Up @@ -440,7 +445,7 @@ impl EpochManager {

fn spawn_block_retrieval_task(&mut self, epoch: u64, block_store: Arc<BlockStore>) {
let (request_tx, mut request_rx) = aptos_channel::new(
QueueStyle::LIFO,
QueueStyle::FIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
Expand Down Expand Up @@ -759,7 +764,7 @@ impl EpochManager {
error!("Failed to read on-chain consensus config {}", error);
}

self.epoch_state = Some(epoch_state.clone());
self.epoch_state = Some(Arc::new(epoch_state.clone()));

match self.storage.start() {
LivenessStorageData::FullRecoveryData(initial_data) => {
Expand Down Expand Up @@ -800,27 +805,43 @@ impl EpochManager {
self.filter_quorum_store_events(peer_id, &unverified_event)?;

// same epoch -> run well-formedness + signature check
let verified_event = monitor!(
"verify_message",
unverified_event.clone().verify(
peer_id,
&self.epoch_state().verifier,
self.quorum_store_enabled
)
)
.context("[EpochManager] Verify event")
.map_err(|err| {
error!(
SecurityEvent::ConsensusInvalidMessage,
remote_peer = peer_id,
error = ?err,
unverified_event = unverified_event
);
err
})?;

// process the verified event
self.process_event(peer_id, verified_event)?;
let epoch_state = self.epoch_state.clone().unwrap();
let quorum_store_enabled = self.quorum_store_enabled;
let quorum_store_msg_tx = self.quorum_store_msg_tx.clone();
let buffer_manager_msg_tx = self.buffer_manager_msg_tx.clone();
let round_manager_tx = self.round_manager_tx.clone();
let my_peer_id = self.author;
self.bounded_executor
.spawn(async move {
match monitor!(
"verify_message",
unverified_event.clone().verify(
peer_id,
&epoch_state.verifier,
quorum_store_enabled,
peer_id == my_peer_id,
)
) {
Ok(verified_event) => {
Self::forward_event(
quorum_store_msg_tx,
buffer_manager_msg_tx,
round_manager_tx,
peer_id,
verified_event,
);
},
Err(e) => {
error!(
SecurityEvent::ConsensusInvalidMessage,
remote_peer = peer_id,
error = ?e,
unverified_event = unverified_event
);
},
}
})
.await;
}
Ok(())
}
Expand Down Expand Up @@ -910,49 +931,55 @@ impl EpochManager {
}
}

fn process_event(
&mut self,
fn forward_event_to<K: Eq + Hash + Clone, V>(
mut maybe_tx: Option<aptos_channel::Sender<K, V>>,
key: K,
value: V,
) -> anyhow::Result<()> {
if let Some(tx) = &mut maybe_tx {
tx.push(key, value)
} else {
bail!("channel not initialized");
}
}

fn forward_event(
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
buffer_manager_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
round_manager_tx: Option<
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
peer_id: AccountAddress,
event: VerifiedEvent,
) -> anyhow::Result<()> {
) {
if let VerifiedEvent::ProposalMsg(proposal) = &event {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::EPOCH_MANAGER_VERIFIED,
);
}
match event {
if let Err(e) = match event {
quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_)
| VerifiedEvent::UnverifiedBatchMsg(_)
| VerifiedEvent::SignedDigestMsg(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
if let Some(sender) = &mut self.quorum_store_msg_tx {
sender.push(peer_id, quorum_store_event)?;
}
Self::forward_event_to(quorum_store_msg_tx, peer_id, quorum_store_event)
.context("quorum store sender")
},
buffer_manager_event @ (VerifiedEvent::CommitVote(_)
| VerifiedEvent::CommitDecision(_)) => {
if let Some(sender) = &mut self.buffer_manager_msg_tx {
sender.push(peer_id, buffer_manager_event)?;
} else {
bail!("Commit Phase not started but received Commit Message (CommitVote/CommitDecision)");
}
Self::forward_event_to(buffer_manager_msg_tx, peer_id, buffer_manager_event)
.context("buffer manager sender")
},
round_manager_event => {
self.forward_to_round_manager(peer_id, round_manager_event);
},
}
Ok(())
}

fn forward_to_round_manager(&mut self, peer_id: Author, event: VerifiedEvent) {
let sender = self
.round_manager_tx
.as_mut()
.expect("RoundManager not started");
if let Err(e) = sender.push((peer_id, discriminant(&event)), (peer_id, event)) {
error!("Failed to send event to round manager {:?}", e);
round_manager_event => Self::forward_event_to(
round_manager_tx,
(peer_id, discriminant(&round_manager_event)),
(peer_id, round_manager_event),
)
.context("round manager sender"),
} {
warn!("Failed to forward event: {}", e);
}
}

Expand All @@ -972,7 +999,15 @@ impl EpochManager {
}

fn process_local_timeout(&mut self, round: u64) {
self.forward_to_round_manager(self.author, VerifiedEvent::LocalTimeout(round));
let peer_id = self.author;
let event = VerifiedEvent::LocalTimeout(round);
let sender = self
.round_manager_tx
.as_mut()
.expect("RoundManager not started");
if let Err(e) = sender.push((peer_id, discriminant(&event)), (peer_id, event)) {
error!("Failed to send event to round manager {:?}", e);
}
}

async fn await_reconfig_notification(&mut self) {
Expand All @@ -993,7 +1028,7 @@ impl EpochManager {
// initial start of the processor
self.await_reconfig_notification().await;
loop {
::futures::select! {
tokio::select! {
(peer, msg) = network_receivers.consensus_messages.select_next_some() => {
if let Err(e) = self.process_message(peer, msg).await {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/experimental/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ async fn loopback_commit_vote(
let event: UnverifiedEvent = msg.into();
// verify the message and send the message into self loop
msg_tx
.push(author, event.verify(author, verifier, false).unwrap())
.push(
author,
event.verify(author, verifier, false, false).unwrap(),
)
.ok();
}
},
Expand Down
13 changes: 8 additions & 5 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct IncomingBlockRetrievalRequest {
/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
/// Provide a LIFO buffer for each (Author, MessageType) key
/// Provide a FIFO buffer for each (Author, MessageType) key
pub consensus_messages: aptos_channel::Receiver<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
Expand Down Expand Up @@ -356,17 +356,20 @@ impl NetworkTask {
network_service_events: NetworkServiceEvents<ConsensusMsg>,
self_receiver: aptos_channels::Receiver<Event<ConsensusMsg>>,
) -> (NetworkTask, NetworkReceivers) {
let (consensus_messages_tx, consensus_messages) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS));
let (consensus_messages_tx, consensus_messages) = aptos_channel::new(
QueueStyle::FIFO,
20,
Some(&counters::CONSENSUS_CHANNEL_MSGS),
);
let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new(
QueueStyle::FIFO,
// TODO: tune this value based on quorum store messages with backpressure
50,
Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
QueueStyle::FIFO,
10,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);

Expand Down
4 changes: 4 additions & 0 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl NetworkPlayground {
}
}

pub fn handle(&self) -> Handle {
self.executor.clone()
}

/// HashMap of supported protocols to initialize ConsensusNetworkClient.
pub fn peer_protocols(&self) -> Arc<PeersAndMetadata> {
self.peers_and_metadata.clone()
Expand Down
Loading

0 comments on commit 88bb502

Please sign in to comment.