diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index d6817153..7b2ab23d 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -44,18 +44,19 @@ fn read_command(args: &[String], path: &str) -> Result<()> { }; let mut fs: FileStorage = - FileStorage::new("cli".to_string(), Path::new(path)); + FileStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FileStorage")?; let map = fs .read_fs() .expect("No Data is present on this path"); if keys.is_empty() { - for (key, value) in &map { + for (key, value) in map { println!("{}: {}", key, value); } } for key in &keys { - if let Some(value) = &map.get(key) { + if let Some(value) = map.get(key) { println!("{}: {}", key, value); } else { eprintln!("Key '{}' not found", key); @@ -78,7 +79,8 @@ fn write_command(args: &[String], path: &str) -> Result<()> { .map_or(false, |ext| ext == "json"); let mut fs: FileStorage = - FileStorage::new("cli".to_string(), Path::new(path)); + FileStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FileStorage")?; if content_json { let content = fs::read_to_string(content).context("Failed to read JSON file")?; diff --git a/fs-storage/src/base_storage.rs b/fs-storage/src/base_storage.rs index ea8652af..8a4b1f20 100644 --- a/fs-storage/src/base_storage.rs +++ b/fs-storage/src/base_storage.rs @@ -1,6 +1,40 @@ use data_error::Result; use std::collections::BTreeMap; +#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Clone)] +/// Represents the synchronization status of the storage. +pub enum SyncStatus { + /// No synchronization needed. + InSync, + /// In-memory key-value mapping is stale. + MappingStale, + /// External file system storage is stale. + StorageStale, + /// In-memory key-value mapping and external file system storage diverge. + Diverge, +} + +impl std::fmt::Display for SyncStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SyncStatus::InSync => write!(f, "InSync"), + SyncStatus::MappingStale => write!(f, "MappingStale"), + SyncStatus::StorageStale => write!(f, "StorageStale"), + SyncStatus::Diverge => write!(f, "Diverge"), + } + } +} + +/// The `BaseStorage` trait represents a key-value mapping that is written to the file system. +/// +/// This trait provides methods to create or update entries in the internal mapping, remove entries from the internal mapping, +/// determine if the in-memory model or the underlying storage requires syncing, scan and load the mapping from the filesystem, +/// write the mapping to the filesystem, and remove all stored data. +/// +/// The trait also includes a method to merge values from another key-value mapping. +/// +/// Note: The trait does not write to storage by default. It is up to the implementor to decide when to read or write to storage +/// based on `SyncStatus`. This is to allow for trading off between performance and consistency. pub trait BaseStorage: AsRef> { /// Create or update an entry in the internal mapping. fn set(&mut self, id: K, value: V); @@ -8,30 +42,23 @@ pub trait BaseStorage: AsRef> { /// Remove an entry from the internal mapping. fn remove(&mut self, id: &K) -> Result<()>; - /// Determine if in-memory model - /// or the underlying storage requires syncing. - /// This is a quick method checking timestamps - /// of modification of both model and storage. - /// - /// Returns: - /// - `Ok(true)` if the on-disk data and in-memory data are not in sync. - /// - `Ok(false)` if the on-disk data and in-memory data are in sync. - /// - `Err(ArklibError::Storage)` in case of any error retrieving the file metadata. - fn needs_syncing(&self) -> Result; + /// Get [`SyncStatus`] of the storage + fn sync_status(&self) -> Result; + + /// Sync the in-memory storage with the storage on disk + fn sync(&mut self) -> Result<()>; /// Scan and load the key-value mapping /// from pre-configured location in the filesystem. - fn read_fs(&mut self) -> Result>; + fn read_fs(&mut self) -> Result<&BTreeMap>; - /// Persist the internal key-value mapping + /// Write the internal key-value mapping /// to pre-configured location in the filesystem. fn write_fs(&mut self) -> Result<()>; - /// Remove all persisted data - /// by pre-configured location in the file-system. + /// Erase data stored on the filesystem fn erase(&self) -> Result<()>; - /// Merge two storages instances - /// and write the result to the filesystem. + /// Merge values from another key-value mapping. fn merge_from(&mut self, other: impl AsRef>) -> Result<()>; } diff --git a/fs-storage/src/file_storage.rs b/fs-storage/src/file_storage.rs index f9bbbe62..57fcdd80 100644 --- a/fs-storage/src/file_storage.rs +++ b/fs-storage/src/file_storage.rs @@ -1,13 +1,13 @@ use serde::{Deserialize, Serialize}; use std::fs::{self, File}; -use std::io::{BufWriter, Write}; +use std::io::Write; use std::time::SystemTime; use std::{ collections::BTreeMap, path::{Path, PathBuf}, }; -use crate::base_storage::BaseStorage; +use crate::base_storage::{BaseStorage, SyncStatus}; use crate::monoid::Monoid; use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; @@ -29,9 +29,16 @@ pub struct FileStorage where K: Ord, { + /// Label for logging label: String, + /// Path to the underlying file where data is persisted path: PathBuf, + /// Last modified time of internal mapping. This becomes equal to + /// `written_to_disk` only when data is written or read from disk. modified: SystemTime, + /// Last time the data was written to disk. This becomes equal to + /// `modified` only when data is written or read from disk. + written_to_disk: SystemTime, data: FileStorageData, } @@ -48,6 +55,15 @@ where entries: BTreeMap, } +impl AsRef> for FileStorageData +where + K: Ord, +{ + fn as_ref(&self) -> &BTreeMap { + &self.entries + } +} + impl FileStorage where K: Ord @@ -62,75 +78,32 @@ where + Monoid, { /// Create a new file storage with a diagnostic label and file path - pub fn new(label: String, path: &Path) -> Self { - Self { + /// The storage will be initialized using the disk data, if the path exists + /// + /// Note: if the file storage already exists, the data will be read from the file + /// without overwriting it. + pub fn new(label: String, path: &Path) -> Result { + let time = SystemTime::now(); + let mut storage = Self { label, path: PathBuf::from(path), - modified: SystemTime::now(), + modified: time, + written_to_disk: time, data: FileStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), }, - } - } -} + }; -impl BaseStorage for FileStorage -where - K: Ord - + Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::str::FromStr, - V: Clone - + serde::Serialize - + serde::de::DeserializeOwned - + std::str::FromStr - + Monoid, -{ - /// Set a key-value pair in the storage - fn set(&mut self, key: K, value: V) { - self.data.entries.insert(key, value); - self.modified = std::time::SystemTime::now(); - } - - /// Remove a key-value pair from the storage given a key - fn remove(&mut self, id: &K) -> Result<()> { - self.data.entries.remove(id).ok_or_else(|| { - ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) - })?; - self.modified = std::time::SystemTime::now(); - self.write_fs() - .expect("Failed to remove data from disk"); - Ok(()) - } - - /// Compare the timestamp of the storage file - /// with the timestamp of the in-memory storage update - /// to determine if either of the two requires syncing. - fn needs_syncing(&self) -> Result { - match fs::metadata(&self.path) { - Ok(metadata) => { - let get_duration_since_epoch = |time: SystemTime| { - time.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - }; - - let fs_modified = - get_duration_since_epoch(metadata.modified()?); - let self_modified = get_duration_since_epoch(self.modified); - - Ok(fs_modified != self_modified) - } - Err(e) => { - Err(ArklibError::Storage(self.label.clone(), e.to_string())) - } + if Path::exists(path) { + storage.read_fs()?; } + + Ok(storage) } - /// Read the data from the storage file - fn read_fs(&mut self) -> Result> { + /// Load mapping from file + fn load_fs_data(&self) -> Result> { if !self.path.exists() { return Err(ArklibError::Storage( self.label.clone(), @@ -148,7 +121,10 @@ where "Version 2 storage format detected for {}", self.label ); - self.modified = fs::metadata(&self.path)?.modified()?; + let data = FileStorageData { + version: 2, + entries: data, + }; return Ok(data); } Err(_) => { @@ -176,12 +152,100 @@ where ), )); } + + Ok(data) + } +} + +impl BaseStorage for FileStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr, + V: Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + Monoid, +{ + /// Set a key-value pair in the internal mapping + fn set(&mut self, key: K, value: V) { + self.data.entries.insert(key, value); + self.modified = std::time::SystemTime::now(); + } + + /// Remove an entry from the internal mapping given a key + fn remove(&mut self, id: &K) -> Result<()> { + self.data.entries.remove(id).ok_or_else(|| { + ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) + })?; + self.modified = std::time::SystemTime::now(); + Ok(()) + } + + /// Compare the timestamp of the storage file + /// with the timestamp of the in-memory storage and the last written + /// to time to determine if either of the two requires syncing. + fn sync_status(&self) -> Result { + let file_updated = fs::metadata(&self.path)?.modified()?; + + // Determine the synchronization status based on the modification times + // Conditions: + // 1. If both the in-memory storage and the storage on disk have been modified + // since the last write, then the storage is diverged. + // 2. If only the in-memory storage has been modified since the last write, + // then the storage on disk is stale. + // 3. If only the storage on disk has been modified since the last write, + // then the in-memory storage is stale. + // 4. If neither the in-memory storage nor the storage on disk has been modified + // since the last write, then the storage is in sync. + let status = match ( + self.modified > self.written_to_disk, + file_updated > self.written_to_disk, + ) { + (true, true) => SyncStatus::Diverge, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (false, false) => SyncStatus::InSync, + }; + + log::info!("{} sync status is {}", self.label, status); + Ok(status) + } + + /// Sync the in-memory storage with the storage on disk + fn sync(&mut self) -> Result<()> { + match self.sync_status()? { + SyncStatus::InSync => Ok(()), + SyncStatus::MappingStale => self.read_fs().map(|_| ()), + SyncStatus::StorageStale => self.write_fs().map(|_| ()), + SyncStatus::Diverge => { + let data = self.load_fs_data()?; + self.merge_from(&data)?; + self.write_fs()?; + Ok(()) + } + } + } + + /// Read the data from file + fn read_fs(&mut self) -> Result<&BTreeMap> { + let data = self.load_fs_data()?; + + // Update file storage with loaded data self.modified = fs::metadata(&self.path)?.modified()?; + self.written_to_disk = self.modified; + self.data = data; - Ok(data.entries) + Ok(&self.data.entries) } - /// Write the data to the storage file + /// Write the data to file + /// + /// Update the modified timestamp in file metadata to avoid OS timing issues + /// https://github.com/ARK-Builders/ark-rust/pull/63#issuecomment-2163882227 fn write_fs(&mut self) -> Result<()> { let parent_dir = self.path.parent().ok_or_else(|| { ArklibError::Storage( @@ -190,16 +254,16 @@ where ) })?; fs::create_dir_all(parent_dir)?; - let file = File::create(&self.path)?; - let mut writer = BufWriter::new(file); - let value_data = serde_json::to_string_pretty(&self.data)?; - writer.write_all(value_data.as_bytes())?; - - let new_timestamp = fs::metadata(&self.path)?.modified()?; - if new_timestamp == self.modified { - return Err("Timestamp has not been updated".into()); - } + let mut file = File::create(&self.path)?; + file.write_all(serde_json::to_string_pretty(&self.data)?.as_bytes())?; + file.flush()?; + + let new_timestamp = SystemTime::now(); + file.set_modified(new_timestamp)?; + file.sync_all()?; + self.modified = new_timestamp; + self.written_to_disk = new_timestamp; log::info!( "{} {} entries have been written", @@ -209,7 +273,7 @@ where Ok(()) } - /// Erase the storage file from disk + /// Erase the file from disk fn erase(&self) -> Result<()> { fs::remove_file(&self.path).map_err(|err| { ArklibError::Storage(self.label.clone(), err.to_string()) @@ -246,10 +310,13 @@ where #[cfg(test)] mod tests { - use std::collections::BTreeMap; + use std::{collections::BTreeMap, fs}; use tempdir::TempDir; - use crate::{base_storage::BaseStorage, file_storage::FileStorage}; + use crate::{ + base_storage::{BaseStorage, SyncStatus}, + file_storage::FileStorage, + }; #[test] fn test_file_storage_write_read() { @@ -258,13 +325,16 @@ mod tests { let storage_path = temp_dir.path().join("test_storage.txt"); let mut file_storage = - FileStorage::new("TestStorage".to_string(), &storage_path); + FileStorage::new("TestStorage".to_string(), &storage_path).unwrap(); file_storage.set("key1".to_string(), "value1".to_string()); file_storage.set("key2".to_string(), "value2".to_string()); assert!(file_storage.remove(&"key1".to_string()).is_ok()); - let data_read: BTreeMap<_, _> = file_storage + file_storage + .write_fs() + .expect("Failed to write data to disk"); + let data_read: &BTreeMap<_, _> = file_storage .read_fs() .expect("Failed to read data from disk"); @@ -279,7 +349,7 @@ mod tests { let storage_path = temp_dir.path().join("test_storage.txt"); let mut file_storage = - FileStorage::new("TestStorage".to_string(), &storage_path); + FileStorage::new("TestStorage".to_string(), &storage_path).unwrap(); file_storage.set("key1".to_string(), "value1".to_string()); file_storage.set("key1".to_string(), "value2".to_string()); @@ -293,40 +363,74 @@ mod tests { } #[test] - fn test_file_storage_is_storage_updated() { + fn test_file_metadata_timestamp_updated() { let temp_dir = TempDir::new("tmp").expect("Failed to create temporary directory"); let storage_path = temp_dir.path().join("teststorage.txt"); let mut file_storage = - FileStorage::new("TestStorage".to_string(), &storage_path); + FileStorage::new("TestStorage".to_string(), &storage_path).unwrap(); file_storage.write_fs().unwrap(); - assert_eq!(file_storage.needs_syncing().unwrap(), false); - std::thread::sleep(std::time::Duration::from_secs(1)); + file_storage.set("key1".to_string(), "value1".to_string()); - assert_eq!(file_storage.needs_syncing().unwrap(), true); + let before_write = fs::metadata(&storage_path) + .unwrap() + .modified() + .unwrap(); file_storage.write_fs().unwrap(); - assert_eq!(file_storage.needs_syncing().unwrap(), false); + let after_write = fs::metadata(&storage_path) + .unwrap() + .modified() + .unwrap(); + println!( + "before_write: {:?}, after_write: {:?}", + before_write, after_write + ); + assert!(before_write < after_write); + } - std::thread::sleep(std::time::Duration::from_secs(1)); + #[test] + fn test_file_storage_is_storage_updated() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let storage_path = temp_dir.path().join("teststorage.txt"); + + let mut file_storage = + FileStorage::new("TestStorage".to_string(), &storage_path).unwrap(); + file_storage.write_fs().unwrap(); + assert_eq!(file_storage.sync_status().unwrap(), SyncStatus::InSync); + + file_storage.set("key1".to_string(), "value1".to_string()); + assert_eq!( + file_storage.sync_status().unwrap(), + SyncStatus::StorageStale + ); + file_storage.write_fs().unwrap(); + assert_eq!(file_storage.sync_status().unwrap(), SyncStatus::InSync); // External data manipulation let mut mirror_storage = - FileStorage::new("TestStorage".to_string(), &storage_path); - assert_eq!(mirror_storage.needs_syncing().unwrap(), true); - std::thread::sleep(std::time::Duration::from_secs(1)); - mirror_storage.read_fs().unwrap(); - assert_eq!(mirror_storage.needs_syncing().unwrap(), false); + FileStorage::new("MirrorTestStorage".to_string(), &storage_path) + .unwrap(); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); mirror_storage.set("key1".to_string(), "value3".to_string()); - assert_eq!(mirror_storage.needs_syncing().unwrap(), true); + assert_eq!( + mirror_storage.sync_status().unwrap(), + SyncStatus::StorageStale + ); + mirror_storage.write_fs().unwrap(); - assert_eq!(mirror_storage.needs_syncing().unwrap(), false); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); - assert_eq!(file_storage.needs_syncing().unwrap(), true); + // receive updates from external data manipulation + assert_eq!( + file_storage.sync_status().unwrap(), + SyncStatus::MappingStale + ); file_storage.read_fs().unwrap(); - assert_eq!(file_storage.needs_syncing().unwrap(), false); - assert_eq!(mirror_storage.needs_syncing().unwrap(), false); + assert_eq!(file_storage.sync_status().unwrap(), SyncStatus::InSync); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); } #[test] @@ -337,10 +441,12 @@ mod tests { let storage_path2 = temp_dir.path().join("teststorage2.txt"); let mut file_storage_1 = - FileStorage::new("TestStorage1".to_string(), &storage_path1); + FileStorage::new("TestStorage1".to_string(), &storage_path1) + .unwrap(); let mut file_storage_2 = - FileStorage::new("TestStorage2".to_string(), &storage_path2); + FileStorage::new("TestStorage2".to_string(), &storage_path2) + .unwrap(); file_storage_1.set("key1".to_string(), 2); file_storage_1.set("key2".to_string(), 6);