Skip to content

Commit

Permalink
[dag] add persisted storage
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Jul 6, 2023
1 parent fc03cda commit da278bf
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 47 deletions.
29 changes: 28 additions & 1 deletion consensus/src/consensusdb/consensusdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use aptos_consensus_types::block::block_test_utils::certificate_for_genesis;
use aptos_consensus_types::{
block::block_test_utils::certificate_for_genesis,
common::{Author, Payload},
};
use aptos_temppath::TempPath;
use aptos_types::aggregate_signature::AggregateSignature;

#[test]
fn test_put_get() {
Expand Down Expand Up @@ -70,3 +74,26 @@ fn test_delete_block_and_qc() {
assert_eq!(db.get_blocks().unwrap().len(), 0);
assert_eq!(db.get_quorum_certificates().unwrap().len(), 0);
}

#[test]
fn test_dag() {
let tmp_dir = TempPath::new();
let db = ConsensusDB::new(&tmp_dir);
assert_eq!(db.get_certified_nodes().unwrap().len(), 0);

let node = Node::new(1, 1, Author::random(), 123, Payload::empty(false), vec![]);

db.save_node(&node).unwrap();

let certified_node = CertifiedNode::new(node, AggregateSignature::empty());

db.save_certified_node(&certified_node).unwrap();

let mut from_db = db.get_certified_nodes().unwrap();

assert_eq!(from_db.len(), 1);

let certified_node_from_db = from_db.remove(certified_node.metadata().digest()).unwrap();

assert_eq!(certified_node, certified_node_from_db);
}
46 changes: 40 additions & 6 deletions consensus/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ mod consensusdb_test;
mod schema;

use crate::{
consensusdb::schema::{
block::BlockSchema,
quorum_certificate::QCSchema,
single_entry::{SingleEntryKey, SingleEntrySchema},
},
dag::{CertifiedNode, Node},
error::DbError,
};
use anyhow::Result;
use aptos_consensus_types::{block::Block, quorum_cert::QuorumCert};
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
use aptos_schemadb::{Options, ReadOptions, SchemaBatch, DB, DEFAULT_COLUMN_FAMILY_NAME};
use schema::{BLOCK_CF_NAME, QC_CF_NAME, SINGLE_ENTRY_CF_NAME};
use schema::{
block::BlockSchema,
dag::{CertifiedNodeSchema, NodeSchema},
quorum_certificate::QCSchema,
single_entry::{SingleEntryKey, SingleEntrySchema},
BLOCK_CF_NAME, CERTIFIED_NODE_CF_NAME, NODE_CF_NAME, QC_CF_NAME, SINGLE_ENTRY_CF_NAME,
};
use std::{collections::HashMap, iter::Iterator, path::Path, time::Instant};

/// The name of the consensus db file
Expand Down Expand Up @@ -52,6 +54,8 @@ impl ConsensusDB {
BLOCK_CF_NAME,
QC_CF_NAME,
SINGLE_ENTRY_CF_NAME,
NODE_CF_NAME,
CERTIFIED_NODE_CF_NAME,
];

let path = db_root_path.as_ref().join(CONSENSUS_DB_NAME);
Expand Down Expand Up @@ -187,4 +191,34 @@ impl ConsensusDB {
iter.seek_to_first();
Ok(iter.collect::<Result<HashMap<HashValue, QuorumCert>>>()?)
}

pub fn save_node(&self, node: &Node) -> Result<(), DbError> {
let batch = SchemaBatch::new();
batch.put::<NodeSchema>(&node.digest(), node)?;
self.commit(batch)?;
Ok(())
}

pub fn save_certified_node(&self, node: &CertifiedNode) -> Result<(), DbError> {
let batch = SchemaBatch::new();
batch.put::<CertifiedNodeSchema>(&node.digest(), node)?;
self.commit(batch)?;
Ok(())
}

pub fn get_certified_nodes(&self) -> Result<HashMap<HashValue, CertifiedNode>, DbError> {
let mut iter = self
.db
.iter::<CertifiedNodeSchema>(ReadOptions::default())?;
iter.seek_to_first();
Ok(iter.collect::<Result<HashMap<HashValue, CertifiedNode>>>()?)
}

pub fn delete_certified_nodes(&self, digests: Vec<HashValue>) -> Result<(), DbError> {
let batch = SchemaBatch::new();
digests
.iter()
.try_for_each(|hash| batch.delete::<CertifiedNodeSchema>(hash))?;
self.commit(batch)
}
}
17 changes: 7 additions & 10 deletions consensus/src/consensusdb/schema/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,18 @@
//! | block_hash | block |
//! ```
use super::BLOCK_CF_NAME;
use anyhow::Result;
use aptos_consensus_types::block::Block;
use aptos_crypto::HashValue;
use aptos_schemadb::schema::{KeyCodec, Schema, ValueCodec};
use aptos_schemadb::{
define_schema,
schema::{KeyCodec, ValueCodec},
ColumnFamilyName,
};

#[derive(Debug)]
pub struct BlockSchema;
pub const BLOCK_CF_NAME: ColumnFamilyName = "block";

impl Schema for BlockSchema {
type Key = HashValue;
type Value = Block;

const COLUMN_FAMILY_NAME: aptos_schemadb::ColumnFamilyName = BLOCK_CF_NAME;
}
define_schema!(BlockSchema, HashValue, Block, BLOCK_CF_NAME);

impl KeyCodec<BlockSchema> for HashValue {
fn encode_key(&self) -> Result<Vec<u8>> {
Expand Down
72 changes: 72 additions & 0 deletions consensus/src/consensusdb/schema/dag/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

//! This module defines physical storage schema for DAG.
//!
//! Serialized bytes identified by node digest.
//! ```text
//! |<---key---->|<---value--->|
//! | digest | node/certified node |
//! ```
use crate::dag::{CertifiedNode, Node};
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_schemadb::{
define_schema,
schema::{KeyCodec, ValueCodec},
ColumnFamilyName,
};

pub const NODE_CF_NAME: ColumnFamilyName = "node";

define_schema!(NodeSchema, HashValue, Node, NODE_CF_NAME);

impl KeyCodec<NodeSchema> for HashValue {
fn encode_key(&self) -> Result<Vec<u8>> {
Ok(self.to_vec())
}

fn decode_key(data: &[u8]) -> Result<Self> {
Ok(HashValue::from_slice(data)?)
}
}

impl ValueCodec<NodeSchema> for Node {
fn encode_value(&self) -> Result<Vec<u8>> {
Ok(bcs::to_bytes(&self)?)
}

fn decode_value(data: &[u8]) -> Result<Self> {
Ok(bcs::from_bytes(data)?)
}
}

pub const CERTIFIED_NODE_CF_NAME: ColumnFamilyName = "certified_node";

define_schema!(
CertifiedNodeSchema,
HashValue,
CertifiedNode,
CERTIFIED_NODE_CF_NAME
);

impl KeyCodec<CertifiedNodeSchema> for HashValue {
fn encode_key(&self) -> Result<Vec<u8>> {
Ok(self.to_vec())
}

fn decode_key(data: &[u8]) -> Result<Self> {
Ok(HashValue::from_slice(data)?)
}
}

impl ValueCodec<CertifiedNodeSchema> for CertifiedNode {
fn encode_value(&self) -> Result<Vec<u8>> {
Ok(bcs::to_bytes(&self)?)
}

fn decode_value(data: &[u8]) -> Result<Self> {
Ok(bcs::from_bytes(data)?)
}
}
13 changes: 7 additions & 6 deletions consensus/src/consensusdb/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod block;
pub(crate) mod dag;
pub(crate) mod quorum_certificate;
pub(crate) mod single_entry;

use anyhow::{ensure, Result};
use aptos_schemadb::ColumnFamilyName;

pub(super) const BLOCK_CF_NAME: ColumnFamilyName = "block";
pub(super) const QC_CF_NAME: ColumnFamilyName = "quorum_certificate";
pub(super) const SINGLE_ENTRY_CF_NAME: ColumnFamilyName = "single_entry";

fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> {
pub(crate) fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> {
ensure!(
data.len() == len,
"Unexpected data len {}, expected {}.",
Expand All @@ -22,3 +18,8 @@ fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> {
);
Ok(())
}

pub use block::BLOCK_CF_NAME;
pub use dag::{CERTIFIED_NODE_CF_NAME, NODE_CF_NAME};
pub use quorum_certificate::QC_CF_NAME;
pub use single_entry::SINGLE_ENTRY_CF_NAME;
4 changes: 3 additions & 1 deletion consensus/src/consensusdb/schema/quorum_certificate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@
//! | block_hash | QuorumCert |
//! ```
use super::QC_CF_NAME;
use anyhow::Result;
use aptos_consensus_types::quorum_cert::QuorumCert;
use aptos_crypto::HashValue;
use aptos_schemadb::{
define_schema,
schema::{KeyCodec, ValueCodec},
ColumnFamilyName,
};

pub const QC_CF_NAME: ColumnFamilyName = "quorum_certificate";

define_schema!(QCSchema, HashValue, QuorumCert, QC_CF_NAME);

impl KeyCodec<QCSchema> for HashValue {
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/consensusdb/schema/single_entry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
//! | single entry key | raw value bytes |
//! ```
use super::{ensure_slice_len_eq, SINGLE_ENTRY_CF_NAME};
use super::ensure_slice_len_eq;
use anyhow::{format_err, Result};
use aptos_schemadb::{
define_schema,
schema::{KeyCodec, ValueCodec},
ColumnFamilyName,
};
use byteorder::ReadBytesExt;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::{FromPrimitive, ToPrimitive};
use std::mem::size_of;

pub const SINGLE_ENTRY_CF_NAME: ColumnFamilyName = "single_entry";

define_schema!(
SingleEntrySchema,
SingleEntryKey,
Expand Down
41 changes: 35 additions & 6 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::dag::types::{CertifiedNode, NodeCertificate};
use crate::dag::{
storage::DAGStorage,
types::{CertifiedNode, NodeCertificate},
};
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::{Author, Round};
use aptos_crypto::HashValue;
use aptos_types::validator_verifier::ValidatorVerifier;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand All @@ -18,17 +22,41 @@ pub struct Dag {
nodes_by_round: BTreeMap<Round, Vec<Option<Arc<CertifiedNode>>>>,
/// Map between peer id to vector index
author_to_index: HashMap<Author, usize>,
storage: Arc<dyn DAGStorage>,
}

impl Dag {
pub fn new(author_to_index: HashMap<Author, usize>, initial_round: Round) -> Self {
pub fn new(epoch_state: Arc<EpochState>, storage: Arc<dyn DAGStorage>) -> Self {
let epoch = epoch_state.epoch;
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let num_validators = author_to_index.len();
let all_nodes = storage.get_certified_nodes().unwrap_or_default();
let mut expired = vec![];
let mut nodes_by_digest = HashMap::new();
let mut nodes_by_round = BTreeMap::new();
let num_nodes = author_to_index.len();
nodes_by_round.insert(initial_round, vec![None; num_nodes]);
for (digest, certified_node) in all_nodes {
if certified_node.metadata().epoch() == epoch {
let arc_node = Arc::new(certified_node);
nodes_by_digest.insert(digest, arc_node.clone());
let index = *author_to_index
.get(arc_node.metadata().author())
.expect("Author from certified node should exist");
let round = arc_node.metadata().round();
nodes_by_round
.entry(round)
.or_insert_with(|| vec![None; num_validators])[index] = Some(arc_node);
} else {
expired.push(digest);
}
}
if let Err(e) = storage.delete_certified_nodes(expired) {
error!("Error deleting expired nodes: {:?}", e);
}
Self {
nodes_by_digest: HashMap::new(),
nodes_by_digest,
nodes_by_round,
author_to_index,
storage,
}
}

Expand Down Expand Up @@ -60,6 +88,7 @@ impl Dag {
for parent in node.parents() {
ensure!(self.exists(parent.metadata().digest()), "parent not exist");
}
self.storage.save_certified_node(&node)?;
ensure!(
self.nodes_by_digest
.insert(node.digest(), node.clone())
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ mod dag_handler;
mod dag_network;
mod dag_store;
mod reliable_broadcast;
mod storage;
#[cfg(test)]
mod tests;
mod types;

pub use dag_network::RpcHandler;
pub use types::DAGNetworkMessage;
pub use types::{CertifiedNode, DAGNetworkMessage, Node};
Loading

0 comments on commit da278bf

Please sign in to comment.