Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] broadcast CertifiedNodeMsg with LedgerInfo #9968

Merged
merged 4 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,9 @@ impl DAGStorage for StorageAdapter {
}
Ok(commit_events)
}

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures> {
// TODO: use callback from notifier to cache the latest ledger info
self.aptos_db.get_latest_ledger_info()
}
}
11 changes: 9 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
dag_fetcher::FetchRequester,
order_rule::OrderRule,
storage::DAGStorage,
types::{CertifiedAck, DAGMessage, Extensions},
types::{CertifiedAck, CertifiedNodeMessage, DAGMessage, Extensions},
RpcHandler,
};
use crate::{
Expand Down Expand Up @@ -151,12 +151,19 @@ impl DagDriver {
let signature_builder =
SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone());
let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len());
let latest_ledger_info = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

carrying a db here is a bit ugly, I was thinking about caching one in the notifier adapter and rely on callback to update it, wdyt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we anyhow need the StorageAdapter here right? we can just use that instead. we can cache within the storage adapter if necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds better, just expose a function on the DAGStorage?

.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let task = self
.reliable_broadcast
.broadcast(node.clone(), signature_builder)
.then(move |certificate| {
let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned());
rb.broadcast(certified_node, cert_ack_set)

let certified_node_msg =
CertifiedNodeMessage::new(certified_node, latest_ledger_info);
rb.broadcast(certified_node_msg, cert_ack_set)
});
tokio::spawn(Abortable::new(task, abort_registration));
if let Some(prev_handle) = self.rb_abort_handle.replace(abort_handle) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl NetworkHandler {
.map(|r| r.into()),
DAGMessage::CertifiedNodeMsg(node) => node
.verify(&self.epoch_state.verifier)
.and_then(|_| self.dag_driver.process(node))
.and_then(|_| self.dag_driver.process(node.certified_node()))
Copy link
Contributor

@sasha8 sasha8 Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that the node has certified_node that has a node...
Maybe change the name of the variable?

.map(|r| r.into()),
DAGMessage::FetchRequest(request) => request
.verify(&self.epoch_state.verifier)
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/dag/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{types::Vote, NodeId};
use crate::dag::{CertifiedNode, Node};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_types::ledger_info::LedgerInfoWithSignatures;

pub struct CommitEvent {
node_id: NodeId,
Expand Down Expand Up @@ -48,4 +49,6 @@ pub trait DAGStorage: Send + Sync {
fn delete_ordered_anchor_ids(&self, node_ids: Vec<NodeId>) -> anyhow::Result<()>;

fn get_latest_k_committed_events(&self, k: u64) -> anyhow::Result<Vec<CommitEvent>>;

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures>;
}
11 changes: 8 additions & 3 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier,
epoch_state::EpochState,
ledger_info::{generate_ledger_info_with_sig, LedgerInfo},
validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
use claims::{assert_ok, assert_ok_eq};
Expand Down Expand Up @@ -75,7 +77,10 @@ async fn test_certified_node_handler() {
epoch: 1,
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());

let mock_ledger_info = LedgerInfo::mock_genesis(None);
let mock_ledger_info = generate_ledger_info_with_sig(&signers, mock_ledger_info);
let storage = Arc::new(MockStorage::new_with_ledger_info(mock_ledger_info));
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
Expand Down Expand Up @@ -109,7 +114,7 @@ async fn test_certified_node_handler() {
dag.clone(),
aptos_time_service::TimeService::mock(),
);
let fetch_requester = Arc::new(fetch_requester);
let fetch_requester = Arc::new(fetch_requester);

let mut driver = DagDriver::new(
signers[0].author(),
Expand Down
21 changes: 19 additions & 2 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ use crate::dag::{
use aptos_crypto::HashValue;
use aptos_infallible::Mutex;
use aptos_types::{
epoch_state::EpochState, validator_signer::ValidatorSigner,
validator_verifier::random_validator_verifier,
epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
validator_signer::ValidatorSigner, validator_verifier::random_validator_verifier,
};
use std::{collections::HashMap, sync::Arc};

pub struct MockStorage {
node_data: Mutex<Option<Node>>,
vote_data: Mutex<HashMap<NodeId, Vote>>,
certified_node_data: Mutex<HashMap<HashValue, CertifiedNode>>,
latest_ledger_info: Option<LedgerInfoWithSignatures>,
}

impl MockStorage {
Expand All @@ -29,6 +30,16 @@ impl MockStorage {
node_data: Mutex::new(None),
vote_data: Mutex::new(HashMap::new()),
certified_node_data: Mutex::new(HashMap::new()),
latest_ledger_info: None,
}
}

pub fn new_with_ledger_info(ledger_info: LedgerInfoWithSignatures) -> Self {
Self {
node_data: Mutex::new(None),
vote_data: Mutex::new(HashMap::new()),
certified_node_data: Mutex::new(HashMap::new()),
latest_ledger_info: Some(ledger_info),
}
}
}
Expand Down Expand Up @@ -102,6 +113,12 @@ impl DAGStorage for MockStorage {
fn get_latest_k_committed_events(&self, _k: u64) -> anyhow::Result<Vec<CommitEvent>> {
Ok(vec![])
}

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures> {
self.latest_ledger_info
.clone()
.ok_or_else(|| anyhow::anyhow!("ledger info not set"))
}
}

fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
Expand Down
7 changes: 5 additions & 2 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use aptos_time_service::TimeService;
use aptos_types::{
epoch_state::EpochState,
validator_signer::ValidatorSigner,
validator_verifier::{random_validator_verifier, ValidatorVerifier},
validator_verifier::{random_validator_verifier, ValidatorVerifier}, ledger_info::generate_ledger_info_with_sig,
};
use claims::assert_gt;
use futures::{
Expand Down Expand Up @@ -57,12 +57,14 @@ impl DagBootstrapUnit {
network_events: Box<
Select<NetworkEvents<ConsensusMsg>, aptos_channels::Receiver<Event<ConsensusMsg>>>,
>,
all_signers: Vec<ValidatorSigner>,
) -> (Self, UnboundedReceiver<OrderedBlocks>) {
let epoch_state = EpochState {
epoch,
verifier: storage.get_validator_set().into(),
};
let dag_storage = dag_test::MockStorage::new();
let ledger_info = generate_ledger_info_with_sig(&all_signers, storage.get_ledger_info());
let dag_storage = dag_test::MockStorage::new_with_ledger_info(ledger_info);

let network = Arc::new(DAGNetworkSenderImpl::new(Arc::new(network)));

Expand Down Expand Up @@ -186,6 +188,7 @@ fn bootstrap_nodes(
network,
aptos_time_service::TimeService::real(),
network_events,
signers.clone(),
)
})
.unzip();
Expand Down
36 changes: 24 additions & 12 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,14 @@ impl CertifiedNode {
pub fn certificate(&self) -> NodeCertificate {
NodeCertificate::new(self.node.metadata.clone(), self.signatures.clone())
}

pub fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(self.digest() == self.calculate_digest(), "invalid digest");

verifier
.verify_multi_signatures(self.metadata(), self.certificate().signatures())
.map_err(|e| anyhow::anyhow!("unable to verify: {}", e))
}
}

impl Deref for CertifiedNode {
Expand All @@ -409,16 +417,6 @@ impl Deref for CertifiedNode {
}
}

impl TDAGMessage for CertifiedNode {
fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
ensure!(self.digest() == self.calculate_digest(), "invalid digest");

verifier
.verify_multi_signatures(self.metadata(), self.certificate().signatures())
.map_err(|e| anyhow::anyhow!("unable to verify: {}", e))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct CertifiedNodeMessage {
inner: CertifiedNode,
Expand All @@ -436,6 +434,10 @@ impl CertifiedNodeMessage {
pub fn ledger_info(&self) -> &LedgerInfoWithSignatures {
&self.ledger_info
}

pub fn certified_node(self) -> CertifiedNode {
self.inner
}
}

impl Deref for CertifiedNodeMessage {
Expand All @@ -446,6 +448,16 @@ impl Deref for CertifiedNodeMessage {
}
}

impl TDAGMessage for CertifiedNodeMessage {
fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
self.inner.verify(verifier)?;

self.ledger_info
.verify_signatures(&verifier)
.map_err(|e| anyhow::anyhow!("unable to verify ledger info: {}", e))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Vote {
metadata: NodeMetadata,
Expand Down Expand Up @@ -533,7 +545,7 @@ impl CertifiedAck {
impl BroadcastStatus<DAGMessage> for CertificateAckState {
type Ack = CertifiedAck;
type Aggregated = ();
type Message = CertifiedNode;
type Message = CertifiedNodeMessage;

fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>> {
self.received.insert(peer);
Expand Down Expand Up @@ -647,7 +659,7 @@ impl core::fmt::Debug for DAGNetworkMessage {
pub enum DAGMessage {
NodeMsg(Node),
VoteMsg(Vote),
CertifiedNodeMsg(CertifiedNode),
CertifiedNodeMsg(CertifiedNodeMessage),
CertifiedAckMsg(CertifiedAck),
FetchRequest(RemoteFetchRequest),
FetchResponse(FetchResponse),
Expand Down