Skip to content

Commit

Permalink
Merge branch 'main' into matias-interactive-grafana
Browse files Browse the repository at this point in the history
  • Loading branch information
matias-gonz authored Aug 6, 2024
2 parents 01402d2 + 1810b78 commit 2d969dd
Show file tree
Hide file tree
Showing 34 changed files with 738 additions and 527 deletions.
42 changes: 22 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" }
zk_evm_1_5_0 = { package = "zk_evm", version = "=0.150.0" }

# Consensus dependencies.
zksync_concurrency = "=0.1.0-rc.4"
zksync_consensus_bft = "=0.1.0-rc.4"
zksync_consensus_crypto = "=0.1.0-rc.4"
zksync_consensus_executor = "=0.1.0-rc.4"
zksync_consensus_network = "=0.1.0-rc.4"
zksync_consensus_roles = "=0.1.0-rc.4"
zksync_consensus_storage = "=0.1.0-rc.4"
zksync_consensus_utils = "=0.1.0-rc.4"
zksync_protobuf = "=0.1.0-rc.4"
zksync_protobuf_build = "=0.1.0-rc.4"
zksync_concurrency = "=0.1.0-rc.8"
zksync_consensus_bft = "=0.1.0-rc.8"
zksync_consensus_crypto = "=0.1.0-rc.8"
zksync_consensus_executor = "=0.1.0-rc.8"
zksync_consensus_network = "=0.1.0-rc.8"
zksync_consensus_roles = "=0.1.0-rc.8"
zksync_consensus_storage = "=0.1.0-rc.8"
zksync_consensus_utils = "=0.1.0-rc.8"
zksync_protobuf = "=0.1.0-rc.8"
zksync_protobuf_build = "=0.1.0-rc.8"

# "Local" dependencies
zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" }
Expand Down
1 change: 1 addition & 0 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> {
}
.await?
else {
tracing::info!(%genesis.first_block, "genesis block not found");
return Ok(None);
};
Ok(Some(AttestationStatus {
Expand Down
5 changes: 4 additions & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ pub use self::{
RocksdbStorage, RocksdbStorageBuilder, RocksdbStorageOptions, StateKeeperColumnFamily,
},
shadow_storage::ShadowStorage,
storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory},
storage_factory::{
BatchDiff, OwnedPostgresStorage, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory,
RocksdbWithMemory,
},
storage_view::{StorageView, StorageViewCache, StorageViewMetrics},
witness::WitnessStorage,
};
Expand Down
6 changes: 6 additions & 0 deletions core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ impl RocksdbStorageBuilder {
) -> anyhow::Result<()> {
self.0.revert(storage, last_l1_batch_to_keep).await
}

/// Returns the underlying storage without any checks. Should only be used in test code.
#[doc(hidden)]
pub fn build_unchecked(self) -> RocksdbStorage {
self.0
}
}

impl RocksdbStorage {
Expand Down
100 changes: 75 additions & 25 deletions core/lib/state/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,33 @@ use crate::{
PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily,
};

/// Factory that can produce a [`ReadStorage`] implementation on demand.
/// Factory that can produce [`OwnedStorage`] instances on demand.
#[async_trait]
pub trait ReadStorageFactory: Debug + Send + Sync + 'static {
/// Creates a [`PgOrRocksdbStorage`] entity over either a Postgres connection or RocksDB
/// Creates an [`OwnedStorage`] entity over either a Postgres connection or RocksDB
/// instance. The specific criteria on which one are left up to the implementation.
///
/// The idea is that in either case this provides a valid [`ReadStorage`] implementation
/// that can be used by the caller.
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<PgOrRocksdbStorage<'_>>>;
) -> anyhow::Result<Option<OwnedStorage>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let storage = OwnedPostgresStorage::new(self.clone(), l1_batch_number);
Ok(Some(storage.into()))
}
}

/// DB difference introduced by one batch.
Expand All @@ -47,29 +61,31 @@ pub struct RocksdbWithMemory {
pub batch_diffs: Vec<BatchDiff>,
}

/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`]
/// underneath.
/// Owned Postgres-backed VM storage for a certain L1 batch.
#[derive(Debug)]
pub enum PgOrRocksdbStorage<'a> {
/// Implementation over a Postgres connection.
Postgres(PostgresStorage<'a>),
/// Implementation over a RocksDB cache instance.
Rocksdb(RocksdbStorage),
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
pub struct OwnedPostgresStorage {
connection_pool: ConnectionPool<Core>,
l1_batch_number: L1BatchNumber,
}

impl<'a> PgOrRocksdbStorage<'a> {
impl OwnedPostgresStorage {
/// Creates a VM storage for the specified batch number.
pub fn new(connection_pool: ConnectionPool<Core>, l1_batch_number: L1BatchNumber) -> Self {
Self {
connection_pool,
l1_batch_number,
}
}

/// Returns a [`ReadStorage`] implementation backed by Postgres
///
/// # Errors
///
/// Propagates Postgres errors.
pub async fn access_storage_pg(
pool: &'a ConnectionPool<Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<PgOrRocksdbStorage<'a>> {
let mut connection = pool.connection().await?;
pub async fn borrow(&self) -> anyhow::Result<PgOrRocksdbStorage<'_>> {
let l1_batch_number = self.l1_batch_number;
let mut connection = self.connection_pool.connection().await?;

let l2_block_number = if let Some((_, l2_block_number)) = connection
.blocks_dal()
.get_l2_block_range_of_l1_batch(l1_batch_number)
Expand All @@ -85,9 +101,8 @@ impl<'a> PgOrRocksdbStorage<'a> {
.context("Could not find snapshot, no state available")?;
if snapshot_recovery.l1_batch_number != l1_batch_number {
anyhow::bail!(
"Snapshot contains L1 batch #{} while #{} was expected",
snapshot_recovery.l1_batch_number,
l1_batch_number
"Snapshot contains L1 batch #{} while #{l1_batch_number} was expected",
snapshot_recovery.l1_batch_number
);
}
snapshot_recovery.l2_block_number
Expand All @@ -99,19 +114,54 @@ impl<'a> PgOrRocksdbStorage<'a> {
.into(),
)
}
}

/// Owned version of [`PgOrRocksdbStorage`]. It is thus possible to send to blocking tasks for VM execution.
#[derive(Debug)]
pub enum OwnedStorage {
/// Readily initialized storage with a static lifetime.
Static(PgOrRocksdbStorage<'static>),
/// Storage that must be `borrow()`ed from.
Lending(OwnedPostgresStorage),
}

impl From<OwnedPostgresStorage> for OwnedStorage {
fn from(storage: OwnedPostgresStorage) -> Self {
Self::Lending(storage)
}
}

impl From<PgOrRocksdbStorage<'static>> for OwnedStorage {
fn from(storage: PgOrRocksdbStorage<'static>) -> Self {
Self::Static(storage)
}
}

/// A [`ReadStorage`] implementation that uses either [`PostgresStorage`] or [`RocksdbStorage`]
/// underneath.
#[derive(Debug)]
pub enum PgOrRocksdbStorage<'a> {
/// Implementation over a Postgres connection.
Postgres(PostgresStorage<'a>),
/// Implementation over a RocksDB cache instance.
Rocksdb(RocksdbStorage),
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
}

impl PgOrRocksdbStorage<'static> {
/// Catches up RocksDB synchronously (i.e. assumes the gap is small) and
/// returns a [`ReadStorage`] implementation backed by caught-up RocksDB.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
pub async fn access_storage_rocksdb(
pub async fn rocksdb(
connection: &mut Connection<'_, Core>,
rocksdb: RocksDB<StateKeeperColumnFamily>,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<PgOrRocksdbStorage<'a>>> {
) -> anyhow::Result<Option<Self>> {
tracing::debug!("Catching up RocksDB synchronously");
let rocksdb_builder = RocksdbStorageBuilder::from_rocksdb(rocksdb);
let rocksdb = rocksdb_builder
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
Loading

0 comments on commit 2d969dd

Please sign in to comment.