diff --git a/core/lib/config/src/configs/object_store.rs b/core/lib/config/src/configs/object_store.rs index e5c709fbf545..b9bbb5f7a542 100644 --- a/core/lib/config/src/configs/object_store.rs +++ b/core/lib/config/src/configs/object_store.rs @@ -7,6 +7,15 @@ pub struct ObjectStoreConfig { pub mode: ObjectStoreMode, #[serde(default = "ObjectStoreConfig::default_max_retries")] pub max_retries: u16, + /// Path to local directory that will be used to mirror store objects locally. If not specified, no mirroring will be used. + /// The directory layout is identical to [`ObjectStoreMode::FileBacked`]. + /// + /// Mirroring is primarily useful for local development and testing; it might not provide substantial performance benefits + /// if the Internet connection used by the app is fast enough. + /// + /// **Important.** Mirroring logic assumes that objects in the underlying store are immutable. If this is not the case, + /// the mirrored objects may become stale. + pub local_mirror_path: Option, } impl ObjectStoreConfig { diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 55e4d1c82767..aba67acab484 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -615,6 +615,7 @@ impl Distribution for EncodeDist { configs::ObjectStoreConfig { mode: self.sample(rng), max_retries: self.sample(rng), + local_mirror_path: self.sample(rng), } } } diff --git a/core/lib/env_config/src/fri_prover.rs b/core/lib/env_config/src/fri_prover.rs index b9cb25ef3c4a..65d35a05d3ee 100644 --- a/core/lib/env_config/src/fri_prover.rs +++ b/core/lib/env_config/src/fri_prover.rs @@ -41,6 +41,7 @@ mod tests { gcs_credential_file_path: "/path/to/credentials.json".to_owned(), }, max_retries: 5, + local_mirror_path: None, }), availability_check_interval_in_secs: Some(1_800), } @@ -65,7 +66,6 @@ mod tests { OBJECT_STORE_MODE="GCSWithCredentialFile" OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json" OBJECT_STORE_MAX_RETRIES="5" - "#; lock.set_env(config); diff --git a/core/lib/env_config/src/object_store.rs b/core/lib/env_config/src/object_store.rs index e9d31093c68c..a5881473b351 100644 --- a/core/lib/env_config/src/object_store.rs +++ b/core/lib/env_config/src/object_store.rs @@ -56,6 +56,7 @@ mod tests { gcs_credential_file_path: "/path/to/credentials.json".to_owned(), }, max_retries: 5, + local_mirror_path: Some("/var/cache".to_owned()), } } @@ -67,6 +68,7 @@ mod tests { OBJECT_STORE_MODE="GCSWithCredentialFile" OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json" OBJECT_STORE_MAX_RETRIES="5" + OBJECT_STORE_LOCAL_MIRROR_PATH="/var/cache" "#; lock.set_env(config); let actual = ObjectStoreConfig::from_env().unwrap(); @@ -117,6 +119,7 @@ mod tests { PROVER_OBJECT_STORE_MODE="GCSWithCredentialFile" PROVER_OBJECT_STORE_GCS_CREDENTIAL_FILE_PATH="/path/to/credentials.json" PROVER_OBJECT_STORE_MAX_RETRIES="5" + PROVER_OBJECT_STORE_LOCAL_MIRROR_PATH="/var/cache" "#; lock.set_env(config); let actual = ProverObjectStoreConfig::from_env().unwrap().0; diff --git a/core/lib/object_store/src/factory.rs b/core/lib/object_store/src/factory.rs index 4859b4c2860e..0fa1329ad72c 100644 --- a/core/lib/object_store/src/factory.rs +++ b/core/lib/object_store/src/factory.rs @@ -7,6 +7,7 @@ use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode}; use crate::{ file::FileBackedObjectStore, gcs::{GoogleCloudStore, GoogleCloudStoreAuthMode}, + mirror::MirroringObjectStore, raw::{ObjectStore, ObjectStoreError}, retries::StoreWithRetries, }; @@ -54,11 +55,9 @@ impl ObjectStoreFactory { async fn create_from_config( config: &ObjectStoreConfig, ) -> Result, ObjectStoreError> { + tracing::trace!("Initializing object store with configuration {config:?}"); match &config.mode { ObjectStoreMode::GCS { bucket_base_url } => { - tracing::trace!( - "Initialized GoogleCloudStorage Object store without credential file" - ); let store = StoreWithRetries::try_new(config.max_retries, || { GoogleCloudStore::new( GoogleCloudStoreAuthMode::Authenticated, @@ -66,13 +65,12 @@ impl ObjectStoreFactory { ) }) .await?; - Ok(Arc::new(store)) + Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await } ObjectStoreMode::GCSWithCredentialFile { bucket_base_url, gcs_credential_file_path, } => { - tracing::trace!("Initialized GoogleCloudStorage Object store with credential file"); let store = StoreWithRetries::try_new(config.max_retries, || { GoogleCloudStore::new( GoogleCloudStoreAuthMode::AuthenticatedWithCredentialFile( @@ -82,20 +80,9 @@ impl ObjectStoreFactory { ) }) .await?; - Ok(Arc::new(store)) - } - ObjectStoreMode::FileBacked { - file_backed_base_path, - } => { - tracing::trace!("Initialized FileBacked Object store"); - let store = StoreWithRetries::try_new(config.max_retries, || { - FileBackedObjectStore::new(file_backed_base_path.clone()) - }) - .await?; - Ok(Arc::new(store)) + Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await } ObjectStoreMode::GCSAnonymousReadOnly { bucket_base_url } => { - tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store"); let store = StoreWithRetries::try_new(config.max_retries, || { GoogleCloudStore::new( GoogleCloudStoreAuthMode::Anonymous, @@ -103,8 +90,33 @@ impl ObjectStoreFactory { ) }) .await?; + Self::wrap_mirroring(store, config.local_mirror_path.as_ref()).await + } + + ObjectStoreMode::FileBacked { + file_backed_base_path, + } => { + let store = StoreWithRetries::try_new(config.max_retries, || { + FileBackedObjectStore::new(file_backed_base_path.clone()) + }) + .await?; + + if let Some(mirror_path) = &config.local_mirror_path { + tracing::warn!("Mirroring doesn't make sense with file-backed object store; ignoring mirror path `{mirror_path}`"); + } Ok(Arc::new(store)) } } } + + async fn wrap_mirroring( + store: impl ObjectStore, + mirror_path: Option<&String>, + ) -> Result, ObjectStoreError> { + Ok(if let Some(mirror_path) = mirror_path { + Arc::new(MirroringObjectStore::new(store, mirror_path.clone()).await?) + } else { + Arc::new(store) + }) + } } diff --git a/core/lib/object_store/src/lib.rs b/core/lib/object_store/src/lib.rs index bccc139336b8..bd1e2e7c11ed 100644 --- a/core/lib/object_store/src/lib.rs +++ b/core/lib/object_store/src/lib.rs @@ -27,6 +27,7 @@ mod factory; mod file; mod gcs; mod metrics; +mod mirror; mod mock; mod objects; mod raw; diff --git a/core/lib/object_store/src/mirror.rs b/core/lib/object_store/src/mirror.rs new file mode 100644 index 000000000000..948770e7b39c --- /dev/null +++ b/core/lib/object_store/src/mirror.rs @@ -0,0 +1,150 @@ +//! Mirroring object store. + +use async_trait::async_trait; + +use crate::{file::FileBackedObjectStore, raw::ObjectStore, Bucket, ObjectStoreError}; + +#[derive(Debug)] +pub(crate) struct MirroringObjectStore { + inner: S, + mirror_store: FileBackedObjectStore, +} + +impl MirroringObjectStore { + pub async fn new(inner: S, mirror_path: String) -> Result { + tracing::info!("Initializing mirroring for store {inner:?} at `{mirror_path}`"); + let mirror_store = FileBackedObjectStore::new(mirror_path).await?; + Ok(Self { + inner, + mirror_store, + }) + } +} + +#[async_trait] +impl ObjectStore for MirroringObjectStore { + #[tracing::instrument(skip(self))] + async fn get_raw(&self, bucket: Bucket, key: &str) -> Result, ObjectStoreError> { + match self.mirror_store.get_raw(bucket, key).await { + Ok(object) => { + tracing::trace!("obtained object from mirror"); + return Ok(object); + } + Err(err) => { + if !matches!(err, ObjectStoreError::KeyNotFound(_)) { + tracing::warn!( + "unexpected error calling local mirror store: {:#}", + anyhow::Error::from(err) + ); + } + let object = self.inner.get_raw(bucket, key).await?; + tracing::trace!("obtained object from underlying store"); + if let Err(err) = self.mirror_store.put_raw(bucket, key, object.clone()).await { + tracing::warn!("failed mirroring object: {:#}", anyhow::Error::from(err)); + } else { + tracing::trace!("mirrored object"); + } + Ok(object) + } + } + } + + #[tracing::instrument(skip(self, value), fields(value.len = value.len()))] + async fn put_raw( + &self, + bucket: Bucket, + key: &str, + value: Vec, + ) -> Result<(), ObjectStoreError> { + self.inner.put_raw(bucket, key, value.clone()).await?; + // Only put the value into the mirror once it has been put in the underlying store + if let Err(err) = self.mirror_store.put_raw(bucket, key, value).await { + tracing::warn!("failed mirroring object: {:#}", anyhow::Error::from(err)); + } else { + tracing::trace!("mirrored object"); + } + Ok(()) + } + + #[tracing::instrument(skip(self))] + async fn remove_raw(&self, bucket: Bucket, key: &str) -> Result<(), ObjectStoreError> { + self.inner.remove_raw(bucket, key).await?; + // Only remove the value from the mirror once it has been removed in the underlying store + if let Err(err) = self.mirror_store.remove_raw(bucket, key).await { + tracing::warn!( + "failed removing object from mirror: {:#}", + anyhow::Error::from(err) + ); + } else { + tracing::trace!("removed object from mirror"); + } + Ok(()) + } + + fn storage_prefix_raw(&self, bucket: Bucket) -> String { + self.inner.storage_prefix_raw(bucket) + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use tempfile::TempDir; + + use super::*; + use crate::MockObjectStore; + + #[tokio::test] + async fn mirroring_basics() { + let dir = TempDir::new().unwrap(); + let path = dir.into_path().into_os_string().into_string().unwrap(); + + let mock_store = MockObjectStore::default(); + mock_store + .put_raw(Bucket::StorageSnapshot, "test", vec![1, 2, 3]) + .await + .unwrap(); + let mirroring_store = MirroringObjectStore::new(mock_store, path).await.unwrap(); + + let object = mirroring_store + .get_raw(Bucket::StorageSnapshot, "test") + .await + .unwrap(); + assert_eq!(object, [1, 2, 3]); + // Check that the object got mirrored. + let object_in_mirror = mirroring_store + .mirror_store + .get_raw(Bucket::StorageSnapshot, "test") + .await + .unwrap(); + assert_eq!(object_in_mirror, [1, 2, 3]); + let object = mirroring_store + .get_raw(Bucket::StorageSnapshot, "test") + .await + .unwrap(); + assert_eq!(object, [1, 2, 3]); + + let err = mirroring_store + .get_raw(Bucket::StorageSnapshot, "missing") + .await + .unwrap_err(); + assert_matches!(err, ObjectStoreError::KeyNotFound(_)); + + mirroring_store + .put_raw(Bucket::StorageSnapshot, "other", vec![3, 2, 1]) + .await + .unwrap(); + // Check that the object got mirrored. + let object_in_mirror = mirroring_store + .mirror_store + .get_raw(Bucket::StorageSnapshot, "other") + .await + .unwrap(); + assert_eq!(object_in_mirror, [3, 2, 1]); + let object = mirroring_store + .get_raw(Bucket::StorageSnapshot, "other") + .await + .unwrap(); + assert_eq!(object, [3, 2, 1]); + } +} diff --git a/core/lib/protobuf_config/src/object_store.rs b/core/lib/protobuf_config/src/object_store.rs index a668cea991ae..eb8349321ab4 100644 --- a/core/lib/protobuf_config/src/object_store.rs +++ b/core/lib/protobuf_config/src/object_store.rs @@ -44,6 +44,7 @@ impl ProtoRepr for proto::ObjectStore { max_retries: required(&self.max_retries) .and_then(|x| Ok((*x).try_into()?)) .context("max_retries")?, + local_mirror_path: self.local_mirror_path.clone(), }) } @@ -80,6 +81,7 @@ impl ProtoRepr for proto::ObjectStore { Self { mode: Some(mode), max_retries: Some(this.max_retries.into()), + local_mirror_path: this.local_mirror_path.clone(), } } } diff --git a/core/lib/protobuf_config/src/proto/config/object_store.proto b/core/lib/protobuf_config/src/proto/config/object_store.proto index 1c5a7b5ecdf6..a023f7fa8be5 100644 --- a/core/lib/protobuf_config/src/proto/config/object_store.proto +++ b/core/lib/protobuf_config/src/proto/config/object_store.proto @@ -27,4 +27,5 @@ message ObjectStore { FileBacked file_backed = 4; } optional uint32 max_retries = 5; // required + optional string local_mirror_path = 6; // optional; fs path } diff --git a/prover/prover_fri/tests/basic_test.rs b/prover/prover_fri/tests/basic_test.rs index fa5e5ca9cc63..b6d6226e6967 100644 --- a/prover/prover_fri/tests/basic_test.rs +++ b/prover/prover_fri/tests/basic_test.rs @@ -31,6 +31,7 @@ async fn prover_and_assert_base_layer( file_backed_base_path: "./tests/data/".to_owned(), }, max_retries: 5, + local_mirror_path: None, }; let object_store = ObjectStoreFactory::new(object_store_config) .create_store() diff --git a/prover/witness_generator/tests/basic_test.rs b/prover/witness_generator/tests/basic_test.rs index 8b94224f20c0..7f5356858902 100644 --- a/prover/witness_generator/tests/basic_test.rs +++ b/prover/witness_generator/tests/basic_test.rs @@ -30,6 +30,7 @@ async fn test_leaf_witness_gen() { file_backed_base_path: "./tests/data/leaf/".to_owned(), }, max_retries: 5, + local_mirror_path: None, }; let object_store = ObjectStoreFactory::new(object_store_config) .create_store() @@ -71,6 +72,7 @@ async fn test_node_witness_gen() { file_backed_base_path: "./tests/data/node/".to_owned(), }, max_retries: 5, + local_mirror_path: None, }; let object_store = ObjectStoreFactory::new(object_store_config) .create_store()