diff --git a/src/commit_log.rs b/src/commit_log.rs index c777e86..d130f8e 100644 --- a/src/commit_log.rs +++ b/src/commit_log.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::sync::{Arc, RwLock}; use snafu::{location, Location}; use crate::error::Error::InvalidInput; @@ -8,7 +7,7 @@ use crate::mmap_file::MemoryMappedFile; use crate::error::Result; pub struct CommitLog { - current_file: Arc>, + current_file: MemoryMappedFile, } impl CommitLog { @@ -16,27 +15,24 @@ impl CommitLog { let base_dir = PathBuf::from(base_path); let msg_log_path = base_dir.join("test.log"); // Create or open the initial MemoryMappedFile - let current_file = Arc::new(RwLock::new( - MemoryMappedFile::open(msg_log_path.to_str().unwrap(), 0, max_file_size)?)); + let current_file = MemoryMappedFile::open( + msg_log_path.to_str().unwrap(), 0, max_file_size)?; Ok(CommitLog { current_file }) } - pub fn write_records(&self, data: &Vec) -> Result { - let mut current_file = self.current_file.write().unwrap(); - + pub fn write_records(&mut self, data: &Vec) -> Result { // Write the record to the current file - current_file.append(data) + self.current_file.append(data) } pub fn read_records(&self, msg_index_unit: &MessageIndexUnit) -> Result> { if msg_index_unit.size > 0 { - // Lock the RwLock for reading - let current_file = self.current_file.read().unwrap(); // Read and return records - current_file.read(msg_index_unit.offset as usize, msg_index_unit.size as usize) + self.current_file.read( + msg_index_unit.offset as usize, msg_index_unit.size as usize) } else { Err(InvalidInput { location: location!(), diff --git a/src/mapped_file_queue.rs b/src/mapped_file_queue.rs index d7ad523..6913823 100644 --- a/src/mapped_file_queue.rs +++ b/src/mapped_file_queue.rs @@ -16,7 +16,11 @@ impl MappedFileQueue { Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files: Vec::new() }) } - fn create_mapped_file(&mut self, start_offset: usize) -> Option<&mut MemoryMappedFile> { + pub fn get_mapped_files(&self) -> &Vec { + &self.mapped_files + } + + fn create_mapped_file(&mut self, start_offset: usize) -> &mut MemoryMappedFile { let store_path_clone = self.store_path.clone(); let base_dir = PathBuf::from(store_path_clone); let file_name = format!("{:020}", start_offset); @@ -28,21 +32,38 @@ impl MappedFileQueue { file_path.as_path().to_str().unwrap(), start_offset, self.max_file_size).unwrap(); self.mapped_files.push(mapped_file); - self.mapped_files.last_mut() + self.mapped_files.last_mut().unwrap() } - fn get_last_mapped_file_mut(&mut self, offset: usize) -> Option<&mut MemoryMappedFile> { - if offset == 0 { - self.create_mapped_file(offset); + fn get_last_mapped_file_mut(&mut self) -> &mut MemoryMappedFile { + if self.mapped_files.len() == 0 { + self.create_mapped_file(0); } - self.mapped_files.last_mut() + self.mapped_files.last_mut().unwrap() } // Write data to the memory-mapped file. pub fn append(&mut self, data: &Vec) -> Result { - // TODO offset? - let mapped_file = self.get_last_mapped_file_mut(0).unwrap(); - mapped_file.append(data) + let mapped_file = self.get_last_mapped_file_mut(); + let append_result = mapped_file.append(data); + + match append_result { + Ok(write_offset) => { Ok(write_offset) } + Err(err) => { + match err { + InvalidInput { .. } => { + // data size exceed the size of current file, create a new one and retry + let max_offset = mapped_file.get_max_offset(); + let new_mapped_file = self.create_mapped_file(max_offset); + + new_mapped_file.append(data) + } + other => { + Err(other) + } + } + } + } } // Read data from the memory-mapped file. @@ -62,3 +83,32 @@ impl MappedFileQueue { } } } + +#[cfg(test)] +mod tests { + use tempfile::{TempDir}; + use crate::error::Result; + use crate::mapped_file_queue::MappedFileQueue; + + pub fn create_temp_dir(prefix: &str) -> TempDir { + tempfile::Builder::new().prefix(prefix).tempdir().unwrap() + } + + #[tokio::test] + pub async fn test_write_read() -> Result<()> { + let dir_path = create_temp_dir("topic_mgr_test"); + // Create or open the memory-mapped file. + let mut mapped_file_queue = MappedFileQueue::open( + dir_path.path().to_str().unwrap(), 20)?; + + let test_str = "hello world".as_bytes(); + let test_data = Vec::from(test_str); + + mapped_file_queue.append(&test_data).expect("Error while write"); + mapped_file_queue.append(&test_data).expect("Error while write"); + + assert_eq!(mapped_file_queue.get_mapped_files().len(), 2); + + Ok(()) + } +} diff --git a/src/mmap_file.rs b/src/mmap_file.rs index ca4015b..1b402ed 100644 --- a/src/mmap_file.rs +++ b/src/mmap_file.rs @@ -36,10 +36,11 @@ impl MemoryMappedFile { // Write data to the memory-mapped file. pub fn append(&mut self, data: &Vec) -> Result { let data_len = data.len(); + let write_pos = self.max_offset - self.min_offset; // Ensure the data fits within the mapped region. - if self.max_offset + data_len <= self.mmap.len() { - self.mmap[self.max_offset..self.max_offset + data_len].copy_from_slice(data.as_slice()); + if write_pos + data_len <= self.mmap.len() { + self.mmap[write_pos..write_pos + data_len].copy_from_slice(data.as_slice()); // Flush changes to disk (optional). self.mmap.flush().context(StdIOSnafu)?; @@ -59,10 +60,11 @@ impl MemoryMappedFile { // Read data from the memory-mapped file. pub fn read(&self, offset: usize, data_size: usize) -> Result> { let mut buffer = vec![0; data_size]; + let read_pos = offset - self.min_offset; // Ensure the buffer size matches the mapped region. - if offset + data_size < self.mmap.len() { - buffer.copy_from_slice(&self.mmap[offset..offset + data_size]); + if read_pos + data_size < self.mmap.len() { + buffer.copy_from_slice(&self.mmap[read_pos..read_pos + data_size]); Ok(buffer) } else { diff --git a/src/msg_store.rs b/src/msg_store.rs index bea7d47..b08d0c9 100644 --- a/src/msg_store.rs +++ b/src/msg_store.rs @@ -23,7 +23,7 @@ impl MessageStore { pub async fn write_msg(&self, msg: Message) -> Result { // write the msg - let commit_log = self.commit_log.lock().unwrap(); + let mut commit_log = self.commit_log.lock().unwrap(); let encoded_msg = msg.encode()?; let msg_offset = commit_log.write_records(&encoded_msg)?;