Skip to content

Commit

Permalink
Update mapped file queue, create new mapped file when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihuij committed Nov 10, 2023
1 parent ba929d4 commit 8b03e0e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
18 changes: 7 additions & 11 deletions src/commit_log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use snafu::{location, Location};
use crate::error::Error::InvalidInput;

Expand All @@ -8,35 +7,32 @@ use crate::mmap_file::MemoryMappedFile;
use crate::error::Result;

pub struct CommitLog {
current_file: Arc<RwLock<MemoryMappedFile>>,
current_file: MemoryMappedFile,
}

impl CommitLog {
pub fn open(base_path: &str, max_file_size: u64) -> Result<Self> {
let base_dir = PathBuf::from(base_path);
let msg_log_path = base_dir.join("test.log");
// Create or open the initial MemoryMappedFile
let current_file = Arc::new(RwLock::new(
MemoryMappedFile::open(msg_log_path.to_str().unwrap(), 0, max_file_size)?));
let current_file = MemoryMappedFile::open(
msg_log_path.to_str().unwrap(), 0, max_file_size)?;

Ok(CommitLog {
current_file
})
}

pub fn write_records(&self, data: &Vec<u8>) -> Result<usize> {
let mut current_file = self.current_file.write().unwrap();

pub fn write_records(&mut self, data: &Vec<u8>) -> Result<usize> {
// Write the record to the current file
current_file.append(data)
self.current_file.append(data)
}

pub fn read_records(&self, msg_index_unit: &MessageIndexUnit) -> Result<Vec<u8>> {
if msg_index_unit.size > 0 {
// Lock the RwLock for reading
let current_file = self.current_file.read().unwrap();
// Read and return records
current_file.read(msg_index_unit.offset as usize, msg_index_unit.size as usize)
self.current_file.read(
msg_index_unit.offset as usize, msg_index_unit.size as usize)
} else {
Err(InvalidInput {
location: location!(),
Expand Down
68 changes: 59 additions & 9 deletions src/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ impl MappedFileQueue {
Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files: Vec::new() })
}

fn create_mapped_file(&mut self, start_offset: usize) -> Option<&mut MemoryMappedFile> {
pub fn get_mapped_files(&self) -> &Vec<MemoryMappedFile> {
&self.mapped_files
}

fn create_mapped_file(&mut self, start_offset: usize) -> &mut MemoryMappedFile {
let store_path_clone = self.store_path.clone();
let base_dir = PathBuf::from(store_path_clone);
let file_name = format!("{:020}", start_offset);
Expand All @@ -28,21 +32,38 @@ impl MappedFileQueue {
file_path.as_path().to_str().unwrap(), start_offset, self.max_file_size).unwrap();

self.mapped_files.push(mapped_file);
self.mapped_files.last_mut()
self.mapped_files.last_mut().unwrap()
}

fn get_last_mapped_file_mut(&mut self, offset: usize) -> Option<&mut MemoryMappedFile> {
if offset == 0 {
self.create_mapped_file(offset);
fn get_last_mapped_file_mut(&mut self) -> &mut MemoryMappedFile {
if self.mapped_files.len() == 0 {
self.create_mapped_file(0);
}
self.mapped_files.last_mut()
self.mapped_files.last_mut().unwrap()
}

// Write data to the memory-mapped file.
pub fn append(&mut self, data: &Vec<u8>) -> Result<usize> {
// TODO offset?
let mapped_file = self.get_last_mapped_file_mut(0).unwrap();
mapped_file.append(data)
let mapped_file = self.get_last_mapped_file_mut();
let append_result = mapped_file.append(data);

match append_result {
Ok(write_offset) => { Ok(write_offset) }
Err(err) => {
match err {
InvalidInput { .. } => {
// data size exceed the size of current file, create a new one and retry
let max_offset = mapped_file.get_max_offset();
let new_mapped_file = self.create_mapped_file(max_offset);

new_mapped_file.append(data)
}
other => {
Err(other)
}
}
}
}
}

// Read data from the memory-mapped file.
Expand All @@ -62,3 +83,32 @@ impl MappedFileQueue {
}
}
}

#[cfg(test)]
mod tests {
use tempfile::{TempDir};
use crate::error::Result;
use crate::mapped_file_queue::MappedFileQueue;

pub fn create_temp_dir(prefix: &str) -> TempDir {
tempfile::Builder::new().prefix(prefix).tempdir().unwrap()
}

#[tokio::test]
pub async fn test_write_read() -> Result<()> {
let dir_path = create_temp_dir("topic_mgr_test");
// Create or open the memory-mapped file.
let mut mapped_file_queue = MappedFileQueue::open(
dir_path.path().to_str().unwrap(), 20)?;

let test_str = "hello world".as_bytes();
let test_data = Vec::from(test_str);

mapped_file_queue.append(&test_data).expect("Error while write");
mapped_file_queue.append(&test_data).expect("Error while write");

assert_eq!(mapped_file_queue.get_mapped_files().len(), 2);

Ok(())
}
}
10 changes: 6 additions & 4 deletions src/mmap_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ impl MemoryMappedFile {
// Write data to the memory-mapped file.
pub fn append(&mut self, data: &Vec<u8>) -> Result<usize> {
let data_len = data.len();
let write_pos = self.max_offset - self.min_offset;

// Ensure the data fits within the mapped region.
if self.max_offset + data_len <= self.mmap.len() {
self.mmap[self.max_offset..self.max_offset + data_len].copy_from_slice(data.as_slice());
if write_pos + data_len <= self.mmap.len() {
self.mmap[write_pos..write_pos + data_len].copy_from_slice(data.as_slice());

// Flush changes to disk (optional).
self.mmap.flush().context(StdIOSnafu)?;
Expand All @@ -59,10 +60,11 @@ impl MemoryMappedFile {
// Read data from the memory-mapped file.
pub fn read(&self, offset: usize, data_size: usize) -> Result<Vec<u8>> {
let mut buffer = vec![0; data_size];
let read_pos = offset - self.min_offset;

// Ensure the buffer size matches the mapped region.
if offset + data_size < self.mmap.len() {
buffer.copy_from_slice(&self.mmap[offset..offset + data_size]);
if read_pos + data_size < self.mmap.len() {
buffer.copy_from_slice(&self.mmap[read_pos..read_pos + data_size]);

Ok(buffer)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/msg_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl MessageStore {

pub async fn write_msg(&self, msg: Message) -> Result<usize> {
// write the msg
let commit_log = self.commit_log.lock().unwrap();
let mut commit_log = self.commit_log.lock().unwrap();
let encoded_msg = msg.encode()?;
let msg_offset = commit_log.write_records(&encoded_msg)?;

Expand Down

0 comments on commit 8b03e0e

Please sign in to comment.