Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] consensus improvement for v1.2 #7349

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-bitvec = { workspace = true }
aptos-bounded-executor = { workspace = true }
aptos-channels = { workspace = true }
aptos-config = { workspace = true }
aptos-consensus-notifications = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl BlockStore {
pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool {
(self.ordered_root().round() < li.commit_info().round()
&& !self.block_exists(li.commit_info().id()))
|| self.commit_root().round() + 2 * self.back_pressure_limit < li.commit_info().round()
|| self.commit_root().round() + 3 * self.back_pressure_limit < li.commit_info().round()
}

/// Checks if quorum certificate can be inserted in block store without RPC
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
txn_notifier::MempoolNotifier,
util::time_service::ClockTimeService,
};
use aptos_bounded_executor::BoundedExecutor;
use aptos_config::config::NodeConfig;
use aptos_consensus_notifications::ConsensusNotificationSender;
use aptos_event_notifications::ReconfigNotificationListener;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub fn start_consensus(
state_sync_notifier,
runtime.handle(),
));
let bounded_executor = BoundedExecutor::new(4, runtime.handle().clone());

let time_service = Arc::new(ClockTimeService::new(runtime.handle().clone()));

Expand All @@ -78,6 +80,7 @@ pub fn start_consensus(
state_computer,
storage,
reconfig_events,
bounded_executor,
);

let (network_task, network_receiver) = NetworkTask::new(network_events, self_receiver);
Expand Down
134 changes: 85 additions & 49 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
util::time_service::TimeService,
};
use anyhow::{bail, ensure, Context};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, NodeConfig};
use aptos_consensus_types::{
Expand Down Expand Up @@ -73,6 +74,7 @@ use futures::{
SinkExt, StreamExt,
};
use itertools::Itertools;
use std::hash::Hash;
use std::{
cmp::Ordering,
collections::HashMap,
Expand Down Expand Up @@ -117,9 +119,10 @@ pub struct EpochManager {
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
round_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
epoch_state: Option<EpochState>,
epoch_state: Option<Arc<EpochState>>,
block_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>>,
bounded_executor: BoundedExecutor,
}

impl EpochManager {
Expand All @@ -133,6 +136,7 @@ impl EpochManager {
commit_state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn PersistentLivenessStorage>,
reconfig_events: ReconfigNotificationListener,
bounded_executor: BoundedExecutor,
) -> Self {
let author = node_config.validator_network.as_ref().unwrap().peer_id();
let config = node_config.consensus.clone();
Expand All @@ -157,6 +161,7 @@ impl EpochManager {
round_manager_close_tx: None,
epoch_state: None,
block_retrieval_tx: None,
bounded_executor,
}
}

Expand Down Expand Up @@ -442,7 +447,7 @@ impl EpochManager {

fn spawn_block_retrieval_task(&mut self, epoch: u64, block_store: Arc<BlockStore>) {
let (request_tx, mut request_rx) = aptos_channel::new(
QueueStyle::LIFO,
QueueStyle::FIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
Expand Down Expand Up @@ -719,7 +724,7 @@ impl EpochManager {
error!("Failed to read on-chain consensus config {}", error);
}

self.epoch_state = Some(epoch_state.clone());
self.epoch_state = Some(Arc::new(epoch_state.clone()));

match self.storage.start() {
LivenessStorageData::FullRecoveryData(initial_data) => {
Expand Down Expand Up @@ -757,29 +762,87 @@ impl EpochManager {

if let Some(unverified_event) = maybe_unverified_event {
// same epoch -> run well-formedness + signature check
let verified_event = monitor!(
"verify_message",
unverified_event
.clone()
.verify(&self.epoch_state().verifier, self.quorum_store_enabled)
)
.context("[EpochManager] Verify event")
.map_err(|err| {
error!(
SecurityEvent::ConsensusInvalidMessage,
remote_peer = peer_id,
error = ?err,
unverified_event = unverified_event
);
err
})?;

// process the verified event
self.process_event(peer_id, verified_event)?;
let epoch_state = self.epoch_state.clone().unwrap();
let quorum_store_enabled = self.quorum_store_enabled;
let buffer_manager_msg_tx = self.buffer_manager_msg_tx.clone();
let round_manager_tx = self.round_manager_tx.clone();
let my_peer_id = self.author;
self.bounded_executor
.spawn(async move {
match monitor!(
"verify_message",
unverified_event.clone().verify(
&epoch_state.verifier,
quorum_store_enabled,
peer_id == my_peer_id,
)
) {
Ok(verified_event) => {
Self::forward_event(
buffer_manager_msg_tx,
round_manager_tx,
peer_id,
verified_event,
);
}
Err(e) => {
error!(
SecurityEvent::ConsensusInvalidMessage,
remote_peer = peer_id,
error = ?e,
unverified_event = unverified_event
);
}
}
})
.await;
}
Ok(())
}

fn forward_event_to<K: Eq + Hash + Clone, V>(
mut maybe_tx: Option<aptos_channel::Sender<K, V>>,
key: K,
value: V,
) -> anyhow::Result<()> {
if let Some(tx) = &mut maybe_tx {
tx.push(key, value)
} else {
bail!("channel not initialized");
}
}

fn forward_event(
buffer_manager_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
round_manager_tx: Option<
aptos_channel::Sender<(Author, Discriminant<VerifiedEvent>), (Author, VerifiedEvent)>,
>,
peer_id: AccountAddress,
event: VerifiedEvent,
) {
if let VerifiedEvent::ProposalMsg(proposal) = &event {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::EPOCH_MANAGER_VERIFIED,
);
}
if let Err(e) = match event {
buffer_manager_event @ (VerifiedEvent::CommitVote(_)
| VerifiedEvent::CommitDecision(_)) => {
Self::forward_event_to(buffer_manager_msg_tx, peer_id, buffer_manager_event)
.context("buffer manager sender")
}
round_manager_event => Self::forward_event_to(
round_manager_tx,
(peer_id, discriminant(&round_manager_event)),
(peer_id, round_manager_event),
)
.context("round manager sender"),
} {
warn!("Failed to forward event: {}", e);
}
}

async fn check_epoch(
&mut self,
peer_id: AccountAddress,
Expand Down Expand Up @@ -836,33 +899,6 @@ impl EpochManager {
Ok(None)
}

fn process_event(
&mut self,
peer_id: AccountAddress,
event: VerifiedEvent,
) -> anyhow::Result<()> {
if let VerifiedEvent::ProposalMsg(proposal) = &event {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::EPOCH_MANAGER_VERIFIED,
);
}
match event {
buffer_manager_event @ (VerifiedEvent::CommitVote(_)
| VerifiedEvent::CommitDecision(_)) => {
if let Some(sender) = &mut self.buffer_manager_msg_tx {
sender.push(peer_id, buffer_manager_event)?;
} else {
bail!("Commit Phase not started but received Commit Message (CommitVote/CommitDecision)");
}
}
round_manager_event => {
self.forward_to_round_manager(peer_id, round_manager_event);
}
}
Ok(())
}

fn forward_to_round_manager(&mut self, peer_id: Author, event: VerifiedEvent) {
let sender = self
.round_manager_tx
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/experimental/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async fn loopback_commit_vote(
let event: UnverifiedEvent = msg.into();
// verify the message and send the message into self loop
msg_tx
.push(author, event.verify(verifier, false).unwrap())
.push(author, event.verify(verifier, false, false).unwrap())
.ok();
}
}
Expand Down
13 changes: 8 additions & 5 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct IncomingBlockRetrievalRequest {
/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
/// Provide a LIFO buffer for each (Author, MessageType) key
/// Provide a FIFO buffer for each (Author, MessageType) key
pub consensus_messages: aptos_channel::Receiver<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
Expand Down Expand Up @@ -287,11 +287,14 @@ impl NetworkTask {
network_events: ConsensusNetworkEvents,
self_receiver: aptos_channels::Receiver<Event<ConsensusMsg>>,
) -> (NetworkTask, NetworkReceivers) {
let (consensus_messages_tx, consensus_messages) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS));
let (consensus_messages_tx, consensus_messages) = aptos_channel::new(
QueueStyle::FIFO,
20,
Some(&counters::CONSENSUS_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
QueueStyle::FIFO,
10,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
let all_events = Box::new(select(network_events, self_receiver));
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ impl NetworkPlayground {
}
}

pub fn handle(&self) -> Handle {
self.executor.clone()
}

/// HashMap of supported protocols to initialize ConsensusNetworkSender.
pub fn peer_protocols(&self) -> Arc<PeerMetadataStorage> {
self.peer_metadata_storage.clone()
Expand Down
17 changes: 13 additions & 4 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,33 @@ impl UnverifiedEvent {
self,
validator: &ValidatorVerifier,
quorum_store_enabled: bool,
self_message: bool,
) -> Result<VerifiedEvent, VerifyError> {
Ok(match self {
UnverifiedEvent::ProposalMsg(p) => {
p.verify(validator, quorum_store_enabled)?;
if !self_message {
p.verify(validator, quorum_store_enabled)?;
}
VerifiedEvent::ProposalMsg(p)
}
UnverifiedEvent::VoteMsg(v) => {
v.verify(validator)?;
if !self_message {
v.verify(validator)?;
}
VerifiedEvent::VoteMsg(v)
}
// sync info verification is on-demand (verified when it's used)
UnverifiedEvent::SyncInfo(s) => VerifiedEvent::UnverifiedSyncInfo(s),
UnverifiedEvent::CommitVote(cv) => {
cv.verify(validator)?;
if !self_message {
cv.verify(validator)?;
}
VerifiedEvent::CommitVote(cv)
}
UnverifiedEvent::CommitDecision(cd) => {
cd.verify(validator)?;
if !self_message {
cd.verify(validator)?;
}
VerifiedEvent::CommitDecision(cd)
}
})
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/twins/twins_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
test_utils::{MockStateComputer, MockStorage},
util::time_service::ClockTimeService,
};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::{
config::{NodeConfig, WaypointConfig},
Expand Down Expand Up @@ -135,6 +136,7 @@ impl SMRNode {
aptos_channels::new(1_024, &counters::PENDING_ROUND_TIMEOUTS);
let (self_sender, self_receiver) =
aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES);
let bounded_executor = BoundedExecutor::new(2, playground.handle());

let epoch_mgr = EpochManager::new(
&config,
Expand All @@ -146,6 +148,7 @@ impl SMRNode {
state_computer.clone(),
storage.clone(),
reconfig_listener,
bounded_executor,
);
let (network_task, network_receiver) = NetworkTask::new(network_events, self_receiver);

Expand Down