diff --git a/src/commit_log.rs b/src/commit_log.rs index 5b1207b..aa401ef 100644 --- a/src/commit_log.rs +++ b/src/commit_log.rs @@ -1,8 +1,10 @@ +use std::fs; use std::path::PathBuf; -use snafu::{location, Location}; +use memmap2::MmapMut; +use snafu::{location, Location, ResultExt}; use crate::error::Error::InvalidInput; use crate::msg_index::MessageIndexUnit; -use crate::error::Result; +use crate::error::{Result, StdIOSnafu}; use crate::mapped_file_queue::MappedFileQueue; pub struct CommitLog { @@ -10,12 +12,32 @@ pub struct CommitLog { } impl CommitLog { - pub fn open(store_path: &str, max_file_size: u64) -> Result { + pub fn new(store_path: &str, max_file_size: u64) -> Result { let base_dir = PathBuf::from(store_path); - let msg_index_dir = base_dir.join("commitlog"); + let commit_log_dir = base_dir.join("commitlog"); + + fs::create_dir_all(&commit_log_dir).context(StdIOSnafu)?; + let read_dir = commit_log_dir.read_dir().unwrap(); + let file_num = read_dir.count(); + + let mut mapped_file_queue = MappedFileQueue::new( + commit_log_dir.as_path().to_str().unwrap(), max_file_size)?; + if file_num != 0 { + mapped_file_queue.recovery(|mmap: &MmapMut, offset: usize| { + let size = std::mem::size_of::(); + if offset + size < mmap.len() { + let mut buffer: Vec = vec![0; size]; + buffer.copy_from_slice(&mmap[offset..offset + size]); + + let msg_len = usize::from_le_bytes(buffer.as_slice().try_into().unwrap()); + if msg_len > 0 { + return Some(msg_len + size); + } + } + None + }); + } - let mapped_file_queue = MappedFileQueue::open( - msg_index_dir.as_path().to_str().unwrap(), max_file_size)?; Ok(CommitLog { mapped_file_queue }) } diff --git a/src/http_server.rs b/src/http_server.rs index 0139a43..915d129 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -93,7 +93,7 @@ async fn list_topics(State(topic_mgr_state): State>) -> Response>, + index_store_path: String, } impl IndexStore { // Constructor: Open or create a file for index store. - pub fn open(config: ConfigOptions) -> Result { - Ok(IndexStore { config, index_map: HashMap::new() }) + pub fn new(config: ConfigOptions) -> Result { + let msg_store_path_clone = &config.msg_store_path; + let base_dir = PathBuf::from(msg_store_path_clone); + let index_store_path = base_dir.join("index").as_path().to_str().unwrap().to_string(); + + Ok(IndexStore { config, index_map: HashMap::new(), index_store_path }) } pub fn put_msg_index(&mut self, dispatch_msg: &DispatchMessage) -> Result { @@ -31,11 +36,8 @@ impl IndexStore { let topic_index_map = self.index_map.entry(topic.to_string()).or_insert_with(|| HashMap::new()); topic_index_map.entry(queue_id).or_insert_with(|| { - let msg_store_path_clone = self.config.msg_store_path.clone(); - let base_dir = PathBuf::from(msg_store_path_clone); - let index_store_path = base_dir.join("index"); - MessageIndex::open( - index_store_path.as_path().to_str().unwrap(), + MessageIndex::new( + self.index_store_path.as_str(), topic, queue_id, self.config.index_file_size).unwrap() }) } diff --git a/src/mapped_file_queue.rs b/src/mapped_file_queue.rs index 8a3c541..ed4e63c 100644 --- a/src/mapped_file_queue.rs +++ b/src/mapped_file_queue.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use memmap2::MmapMut; use snafu::{location, Location}; use crate::error::Error::InvalidInput; use crate::error::Result; @@ -11,10 +12,17 @@ pub struct MappedFileQueue { } impl MappedFileQueue { - pub fn open(store_path: &str, max_file_size: u64) -> Result { - // load exist mapped files - let mut mapped_files = Vec::new(); - let store_path_dir = Path::new(store_path); + pub fn new(store_path: &str, max_file_size: u64) -> Result { + Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files: Vec::new() }) + } + + /* + * Recovery from restart or fault, load existed files. + */ + pub fn recovery(&mut self, reader: Func) + where Func: Fn(&MmapMut, usize) -> Option { + let store_path_dir = Path::new(&self.store_path); + // TODO only read the several newest file for entry in store_path_dir.read_dir().unwrap() { if let Ok(entry) = entry { let entry_path = entry.path(); @@ -22,16 +30,17 @@ impl MappedFileQueue { let mapped_file_name = entry_path.file_name().unwrap().to_str().unwrap(); let mapped_file_path = entry_path.to_str().unwrap(); let start_offset: usize = mapped_file_name.parse().expect("Error while parse file name"); - let mapped_file = MemoryMappedFile::open( - mapped_file_path, start_offset, max_file_size).expect("Error while load mapped file"); + let mut mapped_file = MemoryMappedFile::new( + mapped_file_path, start_offset, self.max_file_size).expect("Error while load mapped file"); - println!("loaded mapped file: {:?}, offset={}", &entry.path(), start_offset); - mapped_files.push(mapped_file); + mapped_file.read_record(&reader); + + println!("loaded mapped file: {:?}, offset={}, max_offset={}", &entry.path(), + mapped_file.get_min_offset(), mapped_file.get_max_offset()); + self.mapped_files.push(mapped_file); } } } - - Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files }) } pub fn get_mapped_files(&self) -> &Vec { @@ -46,7 +55,7 @@ impl MappedFileQueue { println!("new memory mapped file: {:?}", &file_path); - let mapped_file = MemoryMappedFile::open( + let mapped_file = MemoryMappedFile::new( file_path.as_path().to_str().unwrap(), start_offset, self.max_file_size).unwrap(); self.mapped_files.push(mapped_file); @@ -114,7 +123,7 @@ mod tests { pub async fn test_write_read() -> Result<()> { let dir_path = create_temp_dir("topic_mgr_test"); // Create or open the memory-mapped file. - let mut mapped_file_queue = MappedFileQueue::open( + let mut mapped_file_queue = MappedFileQueue::new( dir_path.path().to_str().unwrap(), 20)?; let test_str = "hello world".as_bytes(); diff --git a/src/mmap_file.rs b/src/mmap_file.rs index 83f356f..921dd71 100644 --- a/src/mmap_file.rs +++ b/src/mmap_file.rs @@ -12,7 +12,7 @@ pub struct MemoryMappedFile { impl MemoryMappedFile { // Constructor: Open or create a memory-mapped file. - pub fn open(file_path: &str, start_offset: usize, file_size: u64) -> Result { + pub fn new(file_path: &str, start_offset: usize, file_size: u64) -> Result { let file = OpenOptions::new() .read(true) .write(true) @@ -34,6 +34,25 @@ impl MemoryMappedFile { self.max_offset } + pub fn set_max_offset(&mut self, max_offset: usize) { + self.max_offset = max_offset; + } + + pub fn read_record(&mut self, reader: &Func) + where Func: Fn(&MmapMut, usize) -> Option { + let mut write_pos = 0; + loop { + let read_result = reader(&self.mmap, write_pos); + match read_result { + None => { break; } + Some(offset) => { + write_pos += offset; + self.max_offset += offset; + } + } + } + } + // Write data to the memory-mapped file. pub fn append(&mut self, data: &Vec) -> Result { let data_len = data.len(); @@ -63,6 +82,7 @@ impl MemoryMappedFile { let mut buffer = vec![0; data_size]; let read_pos = offset - self.min_offset; + // TODO mmap.len should be max_offset? // Ensure the buffer size matches the mapped region. if read_pos + data_size < self.mmap.len() { buffer.copy_from_slice(&self.mmap[read_pos..read_pos + data_size]); @@ -93,7 +113,7 @@ mod tests { let file_size = 1024; let file_path = dir_path.path().join("temp_mmap_file"); // Create or open the memory-mapped file. - let mut mem_mapped_file = MemoryMappedFile::open( + let mut mem_mapped_file = MemoryMappedFile::new( file_path.to_str().unwrap(), 0, file_size)?; // Write data to the memory-mapped file. diff --git a/src/msg_index.rs b/src/msg_index.rs index fd6f10c..cc92dc5 100644 --- a/src/msg_index.rs +++ b/src/msg_index.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; use std::{fs, u32, u64, usize}; +use memmap2::MmapMut; use snafu::ResultExt; use crate::error::{Result, StdIOSnafu}; use crate::mapped_file_queue::MappedFileQueue; @@ -17,15 +18,37 @@ pub struct MessageIndexUnit { impl MessageIndex { // Constructor: Open or create a file for message index. - pub fn open(store_path: &str, topic: &str, queue_id: u32, max_file_size: u64) -> Result { + pub fn new(store_path: &str, topic: &str, queue_id: u32, max_file_size: u64) -> Result { let base_dir = PathBuf::from(store_path); let msg_index_dir = base_dir.join(topic).join(queue_id.to_string()); fs::create_dir_all(&msg_index_dir).context(StdIOSnafu)?; - let mapped_file_queue = MappedFileQueue::open( + let read_dir = msg_index_dir.read_dir().unwrap(); + let file_num = read_dir.count(); + + let mut mapped_file_queue = MappedFileQueue::new( msg_index_dir.as_path().to_str().unwrap(), max_file_size).unwrap(); + if file_num != 0 { + mapped_file_queue.recovery(|mmap: &MmapMut, offset: usize| { + // TODO size of struct? + if offset + MSG_INDEX_UNIT_SIZE < mmap.len() { + let mut buffer: Vec = vec![0; MSG_INDEX_UNIT_SIZE - 8]; + buffer.copy_from_slice(&mmap[offset + 8..offset + MSG_INDEX_UNIT_SIZE]); + let msg_unit_slice = buffer.as_slice(); + + let size_bytes: [u8; 4] = msg_unit_slice[0..4].try_into().unwrap(); + let size = u32::from_le_bytes(size_bytes); + + if size > 0 { + return Some(offset + MSG_INDEX_UNIT_SIZE); + } + } + None + }); + } + Ok(MessageIndex { mapped_file_queue }) } diff --git a/src/msg_store.rs b/src/msg_store.rs index b08d0c9..8c09691 100644 --- a/src/msg_store.rs +++ b/src/msg_store.rs @@ -12,11 +12,11 @@ pub struct MessageStore { impl MessageStore { // Constructor: Open or create a file for message store. - pub fn open(config: &ConfigOptions) -> Result { - let commit_log = Arc::new(Mutex::new(CommitLog::open( + pub fn new(config: &ConfigOptions) -> Result { + let commit_log = Arc::new(Mutex::new(CommitLog::new( config.msg_store_path.as_str(), config.msg_store_file_size)?)); let config_clone = config.clone(); - let index_store = Arc::new(Mutex::new(IndexStore::open(config_clone)?)); + let index_store = Arc::new(Mutex::new(IndexStore::new(config_clone)?)); Ok(MessageStore { commit_log, index_store }) } @@ -24,15 +24,21 @@ impl MessageStore { pub async fn write_msg(&self, msg: Message) -> Result { // write the msg let mut commit_log = self.commit_log.lock().unwrap(); + + // TODO should write the message content field by field let encoded_msg = msg.encode()?; - let msg_offset = commit_log.write_records(&encoded_msg)?; + let msg_len = encoded_msg.len(); + let mut msg_len_bytes = usize::to_le_bytes(msg_len).to_vec(); + msg_len_bytes.extend(encoded_msg); + + let msg_offset = commit_log.write_records(&msg_len_bytes)?; let mut index_store = self.index_store.lock().unwrap(); let dispatch_msg = DispatchMessage { topic: msg.topic.clone(), queue_id: msg.queue_id, msg_offset, - msg_size: encoded_msg.len(), + msg_size: msg_len + msg_len_bytes.len(), timestamp: msg.timestamp, }; @@ -52,8 +58,11 @@ impl MessageStore { match index_query_result { Ok(msg_index_unit) => { let msg_content = commit_log.read_records(&msg_index_unit)?; + + let msg_len_size = std::mem::size_of::(); + let mut result_vec = Vec::new(); - let msg = Message::decode(msg_content.as_slice())?; + let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?; result_vec.push(msg);