From a8c4688fb526faff1c88bc7c390c93192ddb5deb Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 31 Oct 2023 10:26:10 +0100 Subject: [PATCH] chore: adding more docs to the storage Signed-off-by: Simon Paitrault --- crates/topos-tce-storage/README.md | 62 ++-- crates/topos-tce-storage/src/fullnode/mod.rs | 2 +- crates/topos-tce-storage/src/index/mod.rs | 9 +- crates/topos-tce-storage/src/lib.rs | 167 +++------ crates/topos-tce-storage/src/rocks.rs | 337 +----------------- .../topos-tce-storage/src/rocks/db_column.rs | 1 + crates/topos-tce-storage/src/rocks/types.rs | 20 -- .../topos-tce-storage/src/tests/db_columns.rs | 24 +- .../src/tests/support/columns.rs | 10 +- crates/topos-tce-storage/src/types.rs | 25 +- crates/topos-tce-storage/src/validator/mod.rs | 24 +- .../topos-tce-storage/src/validator/tables.rs | 47 ++- 12 files changed, 184 insertions(+), 544 deletions(-) diff --git a/crates/topos-tce-storage/README.md b/crates/topos-tce-storage/README.md index 77bc827a1..db8d3350e 100644 --- a/crates/topos-tce-storage/README.md +++ b/crates/topos-tce-storage/README.md @@ -21,47 +21,61 @@ As an overview, the storage layer is composed of the following stores: Text changing depending on mode. Light: 'So light!' Dark: 'So dark!' -### Usage +#### Definitions and Responsibilities -Each store represents a different kind of capabilities, but they all act and need the same kind -of configuration in order to work. +As illustrated above, multiple `stores` are exposed in the library using various `tables`. -For instance, the [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) only needs a [`PathBuf`](struct@std::path::PathBuf) -argument to be instantiated where [`FullNodeStore`](struct@fullnode::FullNodeStore) needs a little bit more arguments. +The difference between a `store` and a `table` is that the `table` is responsible for storing +the data while the `store` is responsible for managing the data and its access and behaviour. -The underlying mechanisms of how data is stored is fairly simple, it relies a lot on [`rocksdb`] and will -be describe below. +Here's the list of the different stores and their responsibilities: -As an example, in order to create a new [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) you need to provide a -path where the [`rocksdb`] database will be placed: +- The [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) is responsible for managing the list of validators for each `epoch`. +- The [`FullNodeStore`](struct@fullnode::FullNodeStore) is responsible for managing all the persistent data such as [`Certificate`] delivered and associated `streams`. +- The [`IndexStore`](struct@index::IndexStore) is responsible for managing indexes in order to collect information about the broadcast and the network. +- The [`ValidatorStore`](struct@validator::ValidatorStore) is responsible for managing the pending certificates pool and all the transient and volatile data. -```rust -use epoch::EpochValidatorsStore; -path.push("epoch"); -let store: Arc = EpochValidatorsStore::new(path).unwrap(); -``` +For more information about a `store`, see the related doc. + +Next, we've the list of the different tables and their responsibilities: + +- The [`EpochValidatorsTables`](struct@epoch::EpochValidatorsTables) is responsible for storing the list of validators for each `epoch`. +- The [`ValidatorPerpetualTables`](struct@validator::ValidatorPerpetualTables) is responsible for storing the [`Certificate`] delivered and all the persitent data related to the broadcast. +- The [`ValidatorPendingTables`](struct@validator::ValidatorPendingTables) is responsible for storing the pending certificates pool and all the transient and volatile data. +- The [`IndexTables`](struct@index::IndexTables) is responsible for storing indexes about the delivery of [`Certificate`] such as `target subnet stream`. ### Special Considerations When using the storage layer, you need to be aware of the following: -- The storage layer is using [`rocksdb`] as a backend, which means that the data is stored on disk. -- The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. -- The storage layer is using [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) to expose methods that need to manage locks. (see [`WriteStore`](trait@store::WriteStore)) -- Some functions are using [`DBBatch`](struct@rocks::db_column::DBBatch) to batch multiple writes in one transaction. But not all functions are using it. +- The storage layer is using [rocksdb](https://rocksdb.org/) as a backend, which means that this storage doesn't need external service, as `rocksdb` is embeddable kv store. +- The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. It also means that a `store` is only instantiated once. +- Some functions are batching multiple writes in one transaction. But not all functions are using it. ### Design Philosophy -The choice of using [`rocksdb`] as a backend was made because it is a well known and battle tested database. -It is also very fast and efficient when it comes to write and read data. However, it is not the best when it comes -to compose or filter data. This is why we have multiple store that are used for different purposes. +The choice of using [rocksdb](https://rocksdb.org/) as a backend was made because it is a well known and battle tested database. +It is also very fast and efficient when it comes to write and read data. + +Multiple `stores` and `tables` exists in order to allow admin to deal with backups or +snapshots as they see fit. You can pick and choose which `tables` you want to backup without having to backup the whole database. + +By splitting the data in dedicated tables we define strong separation of concern +directly in our storage. + +`RocksDB` is however not the best fit when it comes to compose or filter data based on the data +itself. For complex queries, another database like [`PostgreSQL`](https://www.postgresql.org/) or [`CockroachDB`](https://www.cockroachlabs.com/) could be used as a Storage for projections. -The source of truth would still be [`rocksdb`] but the projections would be stored in a relational database. Allowing for more complex queries. +The source of truth would still be [rocksdb](https://rocksdb.org/) but the projections would be stored in a relational database, allowing for more complex queries. As mention above, the different stores are using [`Arc`](struct@std::sync::Arc), allowing a single store to be instantiated once -and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places. +and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places but need to provides single entrypoint to the data. It also means that the store is immutable, which is a good thing when it comes to concurrency. + The burden of managing the locks is handled by the [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) crate when using the [`WriteStore`](trait@store::WriteStore). -The rest of the mutation on the data are handled by [`rocksdb`] itself. + +The locks are responsible for preventing any other query to mutate the data currently in processing. For more information about the locks see [`locking`](module@fullnode::locking) + +The rest of the mutation on the data are handled by [rocksdb](https://rocksdb.org/) itself. diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index 052c394bf..6f1c6511a 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -24,7 +24,7 @@ use crate::{ use self::locking::LockGuards; -mod locking; +pub mod locking; /// Store to manage FullNode data /// diff --git a/crates/topos-tce-storage/src/index/mod.rs b/crates/topos-tce-storage/src/index/mod.rs index 5bc6f5dd3..53d80c423 100644 --- a/crates/topos-tce-storage/src/index/mod.rs +++ b/crates/topos-tce-storage/src/index/mod.rs @@ -2,7 +2,7 @@ use std::{fs::create_dir_all, path::PathBuf}; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ - types::stream::{CertificateTargetStreamPosition, Position}, + types::stream::Position, uci::{CertificateId, SubnetId}, }; use tracing::warn; @@ -13,9 +13,8 @@ use crate::{ constants, db::{default_options, init_with_cfs}, db_column::DBColumn, - TargetSourceListKey, }, - types::CertificateSequenceNumber, + types::{CertificateSequenceNumber, TargetSourceListColumn, TargetStreamsColumn}, }; #[allow(unused)] @@ -24,8 +23,8 @@ pub(crate) struct IndexStore { } pub struct IndexTables { - pub(crate) target_streams: DBColumn, - pub(crate) target_source_list: DBColumn, + pub(crate) target_streams: TargetStreamsColumn, + pub(crate) target_source_list: TargetSourceListColumn, pub(crate) source_list: DBColumn, pub(crate) source_list_per_target: DBColumn<(SubnetId, SubnetId), bool>, } diff --git a/crates/topos-tce-storage/src/lib.rs b/crates/topos-tce-storage/src/lib.rs index 88005abab..771931d5a 100644 --- a/crates/topos-tce-storage/src/lib.rs +++ b/crates/topos-tce-storage/src/lib.rs @@ -19,49 +19,70 @@ //! Text changing depending on mode. Light: 'So light!' Dark: 'So dark!' //! //! -//! ## Usage +//! ### Definitions and Responsibilities //! -//! Each store represents a different kind of capabilities, but they all act and need the same kind -//! of configuration in order to work. +//! As illustrated above, multiple `stores` are exposed in the library using various `tables`. //! -//! For instance, the [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) only needs a [`PathBuf`](struct@std::path::PathBuf) -//! argument to be instantiated where [`FullNodeStore`](struct@fullnode::FullNodeStore) needs a little bit more arguments. +//! The difference between a `store` and a `table` is that the `table` is responsible for storing +//! the data while the `store` is responsible for managing the data and its access and behaviour. //! -//! The underlying mechanisms of how data is stored is fairly simple, it relies a lot on [`rocksdb`] and will -//! be describe below. +//! Here's the list of the different stores and their responsibilities: +//! +//! - The [`EpochValidatorsStore`](struct@epoch::EpochValidatorsStore) is responsible for managing the list of validators for each `epoch`. +//! - The [`FullNodeStore`](struct@fullnode::FullNodeStore) is responsible for managing all the persistent data such as [`Certificate`] delivered and associated `streams`. +//! - The [`IndexStore`](struct@index::IndexStore) is responsible for managing indexes in order to collect information about the broadcast and the network. +//! - The [`ValidatorStore`](struct@validator::ValidatorStore) is responsible for managing the pending certificates pool and all the transient and volatile data. +//! +//! For more information about a `store`, see the related doc. +//! +//! Next, we've the list of the different tables and their responsibilities: +//! +//! - The [`EpochValidatorsTables`](struct@epoch::EpochValidatorsTables) is responsible for storing the list of validators for each `epoch`. +//! - The [`ValidatorPerpetualTables`](struct@validator::ValidatorPerpetualTables) is responsible for storing the [`Certificate`] delivered and all the persitent data related to the broadcast. +//! - The [`ValidatorPendingTables`](struct@validator::ValidatorPendingTables) is responsible for storing the pending certificates pool and all the transient and volatile data. +//! - The [`IndexTables`](struct@index::IndexTables) is responsible for storing indexes about the delivery of [`Certificate`] such as `target subnet stream`. //! //! ## Special Considerations //! //! When using the storage layer, you need to be aware of the following: -//! - The storage layer is using [`rocksdb`] as a backend, which means that the data is stored on disk. -//! - The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. -//! - The storage layer is using [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) to expose methods that need to manage locks. (see [`WriteStore`](trait@store::WriteStore)) -//! - Some functions are using [`DBBatch`](struct@rocks::db_column::DBBatch) to batch multiple writes in one transaction. But not all functions are using it. +//! - The storage layer is using [rocksdb](https://rocksdb.org/) as a backend, which means that this storage doesn't need external service, as `rocksdb` is embeddable kv store. +//! - The storage layer is using [`Arc`](struct@std::sync::Arc) to share the stores between threads. It also means that a `store` is only instantiated once. +//! - Some functions are batching multiple writes in one transaction. But not all functions are using it. //! //! ## Design Philosophy //! -//! The choice of using [`rocksdb`] as a backend was made because it is a well known and battle tested database. -//! It is also very fast and efficient when it comes to write and read data. However, it is not the best when it comes -//! to compose or filter data. This is why we have multiple store that are used for different purposes. +//! The choice of using [rocksdb](https://rocksdb.org/) as a backend was made because it is a well known and battle tested database. +//! It is also very fast and efficient when it comes to write and read data. +//! +//! Multiple `stores` and `tables` exists in order to allow admin to deal with backups or +//! snapshots as they see fit. You can pick and choose which `tables` you want to backup without having to backup the whole database. +//! +//! By splitting the data in dedicated tables we define strong separation of concern +//! directly in our storage. +//! +//! `RocksDB` is however not the best fit when it comes to compose or filter data based on the data +//! itself. //! //! For complex queries, another database like [`PostgreSQL`](https://www.postgresql.org/) or [`CockroachDB`](https://www.cockroachlabs.com/) could be used as a Storage for projections. -//! The source of truth would still be [`rocksdb`] but the projections would be stored in a relational database. Allowing for more complex queries. +//! The source of truth would still be [rocksdb](https://rocksdb.org/) but the projections would be stored in a relational database, allowing for more complex queries. //! //! As mention above, the different stores are using [`Arc`](struct@std::sync::Arc), allowing a single store to be instantiated once -//! and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places. +//! and then shared between threads. This is very useful when it comes to the [`FullNodeStore`](struct@fullnode::FullNodeStore) as it is used in various places but need to provides single entrypoint to the data. //! //! It also means that the store is immutable, which is a good thing when it comes to concurrency. +//! //! The burden of managing the locks is handled by the [`async_trait`](https://docs.rs/async-trait/0.1.51/async_trait/) crate when using the [`WriteStore`](trait@store::WriteStore). -//! The rest of the mutation on the data are handled by [`rocksdb`] itself. //! -use errors::InternalStorageError; -use rocks::iterator::ColumnIterator; +//! The locks are responsible for preventing any other query to mutate the data currently in processing. For more information about the locks see [`locking`](module@fullnode::locking) +//! +//! The rest of the mutation on the data are handled by [rocksdb](https://rocksdb.org/) itself. +//! use serde::{Deserialize, Serialize}; use std::collections::HashMap; use topos_core::{ types::stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position}, - uci::{Certificate, CertificateId, SubnetId}, + uci::{CertificateId, SubnetId}, }; // v2 @@ -72,7 +93,6 @@ pub mod epoch; pub mod fullnode; pub mod index; pub mod types; -/// Everything that is needed to participate to the protocol pub mod validator; // v1 @@ -128,108 +148,3 @@ pub struct SourceHead { /// Position of the Certificate pub position: Position, } - -/// Define possible status of a certificate -#[derive(Debug, Deserialize, Serialize)] -pub enum CertificateStatus { - Pending, - Delivered, -} - -/// The `Storage` trait defines methods to interact and manage with the persistency layer -#[async_trait::async_trait] -pub trait Storage: Sync + Send + 'static { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError>; - - /// Add a pending certificate to the pool - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result; - - /// Persist the certificate with given status - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result; - - /// Update the certificate entry with new status - async fn update( - &self, - certificate_id: &CertificateId, - status: CertificateStatus, - ) -> Result<(), InternalStorageError>; - - /// Returns the source heads of given subnets - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given their id - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError>; - - /// Returns the certificate data given its id - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result; - - /// Returns the certificate emitted by given subnet - /// Ranged by position since emitted Certificate are totally ordered - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns the certificate received by given subnet - /// Ranged by timestamps since received Certificate are not referrable by position - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError>; - - /// Returns all the known Certificate that are not delivered yet - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError>; - - /// Returns the next Certificate that are not delivered yet - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError>; - - /// Remove a certificate from pending pool - async fn remove_pending_certificate( - &self, - index: PendingCertificateId, - ) -> Result<(), InternalStorageError>; - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - >; - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError>; -} diff --git a/crates/topos-tce-storage/src/rocks.rs b/crates/topos-tce-storage/src/rocks.rs index c451f5f68..51d2c77ab 100644 --- a/crates/topos-tce-storage/src/rocks.rs +++ b/crates/topos-tce-storage/src/rocks.rs @@ -1,20 +1,4 @@ -use std::collections::HashMap; -use std::{ - fmt::Debug, - sync::atomic::{AtomicU64, Ordering}, -}; - -use topos_core::types::stream::CertificateSourceStreamPosition; -use topos_core::uci::{Certificate, CertificateId, CERTIFICATE_ID_LENGTH}; -use tracing::warn; - -use crate::{ - errors::InternalStorageError, CertificatePositions, CertificateTargetStreamPosition, - PendingCertificateId, Position, SourceHead, Storage, SubnetId, -}; - -use self::iterator::ColumnIterator; -use self::{db::RocksDB, map::Map}; +use self::db::RocksDB; pub(crate) mod constants; pub(crate) mod db; @@ -24,322 +8,3 @@ pub(crate) mod map; pub(crate) mod types; pub(crate) use types::*; - -pub const EMPTY_PREVIOUS_CERT_ID: [u8; CERTIFICATE_ID_LENGTH] = [0u8; CERTIFICATE_ID_LENGTH]; - -#[derive(Debug)] -pub struct RocksDBStorage { - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, -} - -impl RocksDBStorage { - #[cfg(test)] - #[allow(dead_code)] - pub(crate) fn new( - pending_certificates: PendingCertificatesColumn, - certificates: CertificatesColumn, - source_streams: SourceStreamsColumn, - target_streams: TargetStreamsColumn, - target_source_list: TargetSourceListColumn, - next_pending_id: AtomicU64, - ) -> Self { - Self { - pending_certificates, - certificates, - source_streams, - target_streams, - target_source_list, - next_pending_id, - } - } -} - -#[async_trait::async_trait] -impl Storage for RocksDBStorage { - async fn get_pending_certificate( - &self, - certificate_id: CertificateId, - ) -> Result<(PendingCertificateId, Certificate), InternalStorageError> { - self.pending_certificates - .iter()? - .filter(|(_pending_id, cert)| cert.id == certificate_id) - .collect::>() - .first() - .cloned() - .ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn add_pending_certificate( - &self, - certificate: &Certificate, - ) -> Result { - let key = self.next_pending_id.fetch_add(1, Ordering::Relaxed); - - self.pending_certificates.insert(&key, certificate)?; - - Ok(key) - } - - async fn persist( - &self, - certificate: &Certificate, - pending_certificate_id: Option, - ) -> Result { - let mut batch = self.certificates.batch(); - - // Inserting the certificate data into the CERTIFICATES cf - batch = batch.insert_batch(&self.certificates, [(&certificate.id, certificate)])?; - - if let Some(pending_id) = pending_certificate_id { - match self.pending_certificates.get(&pending_id) { - Ok(Some(ref pending_certificate)) if pending_certificate == certificate => { - batch = batch.delete(&self.pending_certificates, pending_id)?; - } - Ok(_) => { - warn!( - "PendingCertificateId {} ignored during persist execution: Difference in \ - certificates", - pending_id - ); - } - - _ => { - warn!( - "PendingCertificateId {} ignored during persist execution: Not Found", - pending_id - ); - } - } - } - - let source_subnet_position = if certificate.prev_id.as_array() == &EMPTY_PREVIOUS_CERT_ID { - Position::ZERO - } else if let Some((CertificateSourceStreamPosition { position, .. }, _)) = self - .source_streams - .prefix_iter(&certificate.source_subnet_id)? - .last() - { - position.increment().map_err(|error| { - InternalStorageError::PositionError(error, certificate.source_subnet_id.into()) - })? - } else { - // TODO: Need to be fixed when dealing with order of delivery - Position::ZERO - // TODO: Better error to define that we were expecting a previous defined position - // return Err(InternalStorageError::CertificateNotFound( - // certificate.prev_id, - // )); - }; - - // Return from function as info - let source_subnet_stream_position = CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }; - - // Adding the certificate to the stream - batch = batch.insert_batch( - &self.source_streams, - [( - CertificateSourceStreamPosition { - subnet_id: certificate.source_subnet_id, - position: source_subnet_position, - }, - certificate.id, - )], - )?; - - // Return list of new target stream positions of certificate that will be persisted - // Information is needed by sequencer/subnet contract to know from - // where to continue with streaming on restart - let mut target_subnet_stream_positions: HashMap = - HashMap::new(); - - // Adding certificate to target_streams - // TODO: Add expected position instead of calculating on the go - let mut targets = Vec::new(); - - for target_subnet_id in &certificate.target_subnets { - let target = match self - .target_streams - .prefix_iter(&TargetSourceListKey( - *target_subnet_id, - certificate.source_subnet_id, - ))? - .last() - { - Some((mut target_stream_position, _)) => { - target_stream_position.position = target_stream_position - .position - .increment() - .map_err(|error| { - InternalStorageError::PositionError( - error, - certificate.source_subnet_id.into(), - ) - })?; - target_stream_position - } - None => CertificateTargetStreamPosition::new( - *target_subnet_id, - certificate.source_subnet_id, - Position::ZERO, - ), - }; - - target_subnet_stream_positions.insert(*target_subnet_id, target); - - batch = batch.insert_batch( - &self.target_source_list, - [( - TargetSourceListKey(*target_subnet_id, certificate.source_subnet_id), - *target.position, - )], - )?; - - targets.push((target, certificate.id)); - } - - batch = batch.insert_batch(&self.target_streams, targets)?; - - batch.write()?; - - Ok(CertificatePositions { - targets: target_subnet_stream_positions, - source: source_subnet_stream_position, - }) - } - - async fn update( - &self, - _certificate_id: &CertificateId, - _status: crate::CertificateStatus, - ) -> Result<(), InternalStorageError> { - unimplemented!(); - } - - async fn get_source_heads( - &self, - subnets: Vec, - ) -> Result, InternalStorageError> { - let mut result: Vec = Vec::new(); - for source_subnet_id in subnets { - let (position, certificate_id) = self - .source_streams - .prefix_iter(&source_subnet_id)? - .last() - .map(|(source_stream_position, cert_id)| (source_stream_position.position, cert_id)) - .ok_or(InternalStorageError::MissingHeadForSubnet(source_subnet_id))?; - result.push(SourceHead { - position, - certificate_id, - subnet_id: source_subnet_id, - }); - } - Ok(result) - } - - async fn get_certificates( - &self, - certificate_ids: Vec, - ) -> Result, InternalStorageError> { - let mut result = Vec::new(); - - for certificate_id in certificate_ids { - result.push(self.get_certificate(certificate_id).await?); - } - - Ok(result) - } - - async fn get_certificate( - &self, - certificate_id: CertificateId, - ) -> Result { - let res = self.certificates.get(&certificate_id)?; - res.ok_or(InternalStorageError::CertificateNotFound(certificate_id)) - } - - async fn get_certificates_by_source( - &self, - source_subnet_id: SubnetId, - from: crate::Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .source_streams - .prefix_iter(&source_subnet_id)? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_certificates_by_target( - &self, - target_subnet_id: SubnetId, - source_subnet_id: SubnetId, - from: Position, - limit: usize, - ) -> Result, InternalStorageError> { - Ok(self - .target_streams - .prefix_iter(&(&target_subnet_id, &source_subnet_id))? - // TODO: Find a better way to convert u64 to usize - .skip(from.try_into().unwrap()) - .take(limit) - .map(|(_, certificate_id)| certificate_id) - .collect()) - } - - async fn get_pending_certificates( - &self, - ) -> Result, InternalStorageError> { - Ok(self.pending_certificates.iter()?.collect()) - } - async fn get_next_pending_certificate( - &self, - starting_at: Option, - ) -> Result, InternalStorageError> { - Ok(self - .pending_certificates - .iter()? - .nth(starting_at.map(|v| v + 1).unwrap_or(0))) - } - - async fn remove_pending_certificate(&self, index: u64) -> Result<(), InternalStorageError> { - self.pending_certificates.delete(&index) - } - - async fn get_target_stream_iterator( - &self, - target: SubnetId, - source: SubnetId, - position: Position, - ) -> Result< - ColumnIterator<'_, CertificateTargetStreamPosition, CertificateId>, - InternalStorageError, - > { - Ok(self.target_streams.prefix_iter_at( - &(&target, &source), - &CertificateTargetStreamPosition::new(target, source, position), - )?) - } - - async fn get_source_list_by_target( - &self, - target: SubnetId, - ) -> Result, InternalStorageError> { - Ok(self - .target_source_list - .prefix_iter(&target)? - .map(|(TargetSourceListKey(_, k), _)| k) - .collect()) - } -} diff --git a/crates/topos-tce-storage/src/rocks/db_column.rs b/crates/topos-tce-storage/src/rocks/db_column.rs index 0a56ca3d0..837729e97 100644 --- a/crates/topos-tce-storage/src/rocks/db_column.rs +++ b/crates/topos-tce-storage/src/rocks/db_column.rs @@ -176,6 +176,7 @@ impl DBBatch { } } + #[allow(unused)] pub(crate) fn delete( mut self, db: &DBColumn, diff --git a/crates/topos-tce-storage/src/rocks/types.rs b/crates/topos-tce-storage/src/rocks/types.rs index 4dad00dc1..edef13ef5 100644 --- a/crates/topos-tce-storage/src/rocks/types.rs +++ b/crates/topos-tce-storage/src/rocks/types.rs @@ -1,13 +1,7 @@ use serde::{Deserialize, Serialize}; -use topos_core::{ - types::stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition}, - uci::{Certificate, CertificateId}, -}; use crate::SubnetId; -use super::db_column::DBColumn; - #[derive(Debug, Serialize, Deserialize)] pub(crate) struct TargetSourceListKey( // Target subnet id @@ -15,17 +9,3 @@ pub(crate) struct TargetSourceListKey( // Source subnet id pub(crate) SubnetId, ); - -/// Column that keeps certificates that are not yet delivered -pub(crate) type PendingCertificatesColumn = DBColumn; -/// Column that keeps list of all certificates retrievable by their id -pub(crate) type CertificatesColumn = DBColumn; -/// Column that keeps list of certificates received from particular subnet and -/// maps (source subnet id, source certificate position) to certificate id -pub(crate) type SourceStreamsColumn = DBColumn; -/// Column that keeps list of certificates that are delivered to target subnet, -/// and maps their target (target subnet, source subnet and position/count per source subnet) -/// to certificate id -pub(crate) type TargetStreamsColumn = DBColumn; -/// Keeps position for particular target subnet id <- source subnet id column in TargetStreamsColumn -pub(crate) type TargetSourceListColumn = DBColumn; diff --git a/crates/topos-tce-storage/src/tests/db_columns.rs b/crates/topos-tce-storage/src/tests/db_columns.rs index 877e5ac61..8378489ad 100644 --- a/crates/topos-tce-storage/src/tests/db_columns.rs +++ b/crates/topos-tce-storage/src/tests/db_columns.rs @@ -2,13 +2,13 @@ use rstest::rstest; use test_log::test; use topos_core::types::stream::CertificateSourceStreamPosition; use topos_core::uci::Certificate; +use topos_test_sdk::certificates::create_certificate_at_position; use topos_test_sdk::constants::SOURCE_SUBNET_ID_1; +use crate::rocks::map::Map; use crate::tests::{PREV_CERTIFICATE_ID, SOURCE_STORAGE_SUBNET_ID}; -use crate::{ - rocks::{map::Map, CertificatesColumn, PendingCertificatesColumn, SourceStreamsColumn}, - Position, -}; +use crate::types::{CertificatesColumn, PendingCertificatesColumn, StreamsColumn}; +use crate::Position; use super::support::columns::{certificates_column, pending_column, source_streams_column}; @@ -29,11 +29,14 @@ async fn can_persist_a_delivered_certificate(certificates_column: CertificatesCo Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &Vec::new()) .unwrap(); + let certificate = create_certificate_at_position(Position::ZERO, certificate); assert!(certificates_column - .insert(&certificate.id, &certificate) + .insert(&certificate.certificate.id, &certificate) .is_ok()); assert_eq!( - certificates_column.get(&certificate.id).unwrap(), + certificates_column + .get(&certificate.certificate.id) + .unwrap(), Some(certificate) ); } @@ -42,25 +45,26 @@ async fn can_persist_a_delivered_certificate(certificates_column: CertificatesCo #[test(tokio::test)] async fn delivered_certificate_position_are_incremented( certificates_column: CertificatesColumn, - source_streams_column: SourceStreamsColumn, + source_streams_column: StreamsColumn, ) { let certificate = Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]).unwrap(); + let certificate = create_certificate_at_position(Position::ZERO, certificate); assert!(certificates_column - .insert(&certificate.id, &certificate) + .insert(&certificate.certificate.id, &certificate) .is_ok()); assert!(source_streams_column .insert( &CertificateSourceStreamPosition::new(SOURCE_STORAGE_SUBNET_ID, Position::ZERO), - &certificate.id + &certificate.certificate.id ) .is_ok()); } #[rstest] #[test(tokio::test)] -async fn position_can_be_fetch_for_one_subnet(source_streams_column: SourceStreamsColumn) { +async fn position_can_be_fetch_for_one_subnet(source_streams_column: StreamsColumn) { let certificate = Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]).unwrap(); diff --git a/crates/topos-tce-storage/src/tests/support/columns.rs b/crates/topos-tce-storage/src/tests/support/columns.rs index dc1fd02be..e64043f48 100644 --- a/crates/topos-tce-storage/src/tests/support/columns.rs +++ b/crates/topos-tce-storage/src/tests/support/columns.rs @@ -1,9 +1,9 @@ use rstest::fixture; -use crate::rocks::TargetSourceListColumn; -use crate::rocks::{ - constants, db_column::DBColumn, CertificatesColumn, PendingCertificatesColumn, - SourceStreamsColumn, TargetStreamsColumn, +use crate::rocks::{constants, db_column::DBColumn}; +use crate::types::{ + CertificatesColumn, PendingCertificatesColumn, StreamsColumn, TargetSourceListColumn, + TargetStreamsColumn, }; use super::database_name; @@ -20,7 +20,7 @@ pub(crate) fn certificates_column(database_name: &'static str) -> CertificatesCo } #[fixture] -pub(crate) fn source_streams_column(database_name: &'static str) -> SourceStreamsColumn { +pub(crate) fn source_streams_column(database_name: &'static str) -> StreamsColumn { DBColumn::reopen(&rocks_db(database_name), constants::SOURCE_STREAMS) } diff --git a/crates/topos-tce-storage/src/types.rs b/crates/topos-tce-storage/src/types.rs index 69c161ed4..631f6db8f 100644 --- a/crates/topos-tce-storage/src/types.rs +++ b/crates/topos-tce-storage/src/types.rs @@ -1,9 +1,16 @@ use topos_core::{ api::grpc::checkpoints::SourceStreamPosition, - types::{CertificateDelivered, Ready, Signature}, + types::{ + stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position}, + CertificateDelivered, Ready, Signature, + }, + uci::{Certificate, CertificateId}, }; -use crate::{CertificatePositions, PendingCertificateId}; +use crate::{ + rocks::{db_column::DBColumn, TargetSourceListKey}, + CertificatePositions, PendingCertificateId, +}; pub type Echo = String; @@ -11,6 +18,20 @@ pub type CertificateSequenceNumber = u64; pub type EpochId = u64; pub type Validators = Vec; +/// Column that keeps certificates that are not yet delivered +pub(crate) type PendingCertificatesColumn = DBColumn; +/// Column that keeps list of all certificates retrievable by their id +pub(crate) type CertificatesColumn = DBColumn; +/// Column that keeps list of certificates received from particular subnet and +/// maps (source subnet id, source certificate position) to certificate id +pub(crate) type StreamsColumn = DBColumn; +/// Column that keeps list of certificates that are delivered to target subnet, +/// and maps their target (target subnet, source subnet and position/count per source subnet) +/// to certificate id +pub(crate) type TargetStreamsColumn = DBColumn; +/// Keeps position for particular target subnet id <- source subnet id column in TargetStreamsColumn +pub(crate) type TargetSourceListColumn = DBColumn; + #[derive(Debug, Clone)] pub enum PendingResult { AlreadyDelivered, diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 5d5e63b3e..b3c7b9644 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -1,3 +1,19 @@ +//! Validator's context store and storage +//! +//! The [`ValidatorStore`] is responsible for managing the different data that are required by the +//! TCE network in order to broadcast certificates. It is composed of two main parts: +//! +//! - a [`FullNodeStore`] +//! - a [`ValidatorPendingTables`] +//! +//! ## Responsibilities +//! +//! This store is used in place where the [`FullNodeStore`] is not enough, it allows to access the +//! different pending pools and to manage them but also to access the [`FullNodeStore`] in order to +//! persist or update [`Certificate`] or `streams`. +//! +//! Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. +//! use std::{ collections::HashMap, path::PathBuf, @@ -33,8 +49,11 @@ mod tables; /// The [`ValidatorStore`] is composed of a [`FullNodeStore`] and a [`ValidatorPendingTables`]. /// /// As the [`FullNodeStore`] is responsible of keeping and managing every data that are persistent, -/// the [`ValidatorStore`] is forwarding everything to it. The crucial point is that the -/// [`ValidatorStore`] is managing the different pending pool using the [`ValidatorPendingTables`]. +/// the [`ValidatorStore`] is delegating many of the [`WriteStore`] and [`ReadStore`] to it. +/// +/// The crucial point is that the [`ValidatorStore`] is managing the different pending pool using a [`ValidatorPendingTables`]. +/// +/// Pending pools and how they behave is decribed in the [`ValidatorPendingTables`] documentation. /// pub struct ValidatorStore { pub(crate) pending_tables: ValidatorPendingTables, @@ -42,6 +61,7 @@ pub struct ValidatorStore { } impl ValidatorStore { + /// Open a [`ValidatorStore`] at the given `path` and using the given [`FullNodeStore`] pub fn open( path: PathBuf, fullnode_store: Arc, diff --git a/crates/topos-tce-storage/src/validator/tables.rs b/crates/topos-tce-storage/src/validator/tables.rs index 9b89b98ef..42968acc9 100644 --- a/crates/topos-tce-storage/src/validator/tables.rs +++ b/crates/topos-tce-storage/src/validator/tables.rs @@ -1,8 +1,8 @@ -use std::{collections::BTreeSet, fs::create_dir_all, path::PathBuf, sync::atomic::AtomicU64}; +use std::{fs::create_dir_all, path::PathBuf, sync::atomic::AtomicU64}; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ - types::{stream::CertificateSourceStreamPosition, CertificateDelivered, ProofOfDelivery}, + types::ProofOfDelivery, uci::{Certificate, CertificateId}, }; use tracing::warn; @@ -14,23 +14,45 @@ use crate::{ db::{default_options, init_with_cfs}, db_column::DBColumn, }, - types::{EpochId, EpochSummary}, + types::{CertificatesColumn, EpochId, EpochSummary, PendingCertificatesColumn, StreamsColumn}, PendingCertificateId, }; -/// Volatile and pending data +/// Volatile and pending data used by Validator +/// +/// It contains data that is not yet delivered. +/// +/// When a [`Certificate`] is received, it can either be added to the pending +/// pool or to the precedence pool. +/// +/// ## Pending pool +/// +/// The pending pool is used to store certificates that are ready to be validated and broadcast. +/// Meaning that the previous [`Certificate`] has been delivered and the [`Certificate`] is +/// ready to be broadcast. +/// +/// The ordering inside the pending pool is a FIFO queue, each [`Certificate`] in the pool gets +/// assigned to a unique [`PendingCertificateId`](type@crate::PendingCertificateId). +/// +/// ## Precedence pool +/// +/// The precedence pool is used to store certificates that are not yet ready to be broadcast, +/// mostly waiting for the previous certificate to be delivered. However, the [`Certificate`] is +/// already validated. +/// +/// When a [`Certificate`] is delivered, the [`ValidatorStore`](struct@super::ValidatorStore) will +/// check for any [`Certificate`] in the precedence pool and if one is found, it is moved to the +/// pending pool, ready to be broadcast. +/// pub struct ValidatorPendingTables { pub(crate) next_pending_id: AtomicU64, - #[allow(unused)] - fetching_pool: BTreeSet, // Not sure to keep it - pub(crate) pending_pool: DBColumn, + pub(crate) pending_pool: PendingCertificatesColumn, pub(crate) pending_pool_index: DBColumn, pub(crate) precedence_pool: DBColumn, - #[allow(unused)] - expiration_tracker: (), // Unknown } impl ValidatorPendingTables { + /// Open the [`ValidatorPendingTables`] at the given path. pub fn open(mut path: PathBuf) -> Self { path.push("pending"); if !path.exists() { @@ -49,19 +71,18 @@ impl ValidatorPendingTables { Self { // TODO: Fetch it from the storage next_pending_id: AtomicU64::new(0), - fetching_pool: BTreeSet::new(), pending_pool: DBColumn::reopen(&db, cfs::PENDING_POOL), pending_pool_index: DBColumn::reopen(&db, cfs::PENDING_POOL_INDEX), precedence_pool: DBColumn::reopen(&db, cfs::PRECEDENCE_POOL), - expiration_tracker: (), } } } /// Data that shouldn't be purged at all. +// TODO: TP-774: Rename and move to FullNode domain pub struct ValidatorPerpetualTables { - pub(crate) certificates: DBColumn, - pub(crate) streams: DBColumn, + pub(crate) certificates: CertificatesColumn, + pub(crate) streams: StreamsColumn, #[allow(unused)] epoch_chain: DBColumn, pub(crate) unverified: DBColumn,