Skip to content

Commit

Permalink
Add recovery function, read the exist file and construct the write of…
Browse files Browse the repository at this point in the history
…fset
  • Loading branch information
zhihuij committed Nov 15, 2023
1 parent 29cc1a6 commit e373aa8
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 36 deletions.
34 changes: 28 additions & 6 deletions src/commit_log.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,43 @@
use std::fs;
use std::path::PathBuf;
use snafu::{location, Location};
use memmap2::MmapMut;
use snafu::{location, Location, ResultExt};
use crate::error::Error::InvalidInput;
use crate::msg_index::MessageIndexUnit;
use crate::error::Result;
use crate::error::{Result, StdIOSnafu};
use crate::mapped_file_queue::MappedFileQueue;

pub struct CommitLog {
mapped_file_queue: MappedFileQueue,
}

impl CommitLog {
pub fn open(store_path: &str, max_file_size: u64) -> Result<Self> {
pub fn new(store_path: &str, max_file_size: u64) -> Result<Self> {
let base_dir = PathBuf::from(store_path);
let msg_index_dir = base_dir.join("commitlog");
let commit_log_dir = base_dir.join("commitlog");

fs::create_dir_all(&commit_log_dir).context(StdIOSnafu)?;
let read_dir = commit_log_dir.read_dir().unwrap();
let file_num = read_dir.count();

let mut mapped_file_queue = MappedFileQueue::new(
commit_log_dir.as_path().to_str().unwrap(), max_file_size)?;
if file_num != 0 {
mapped_file_queue.recovery(|mmap: &MmapMut, offset: usize| {
let size = std::mem::size_of::<usize>();
if offset + size < mmap.len() {
let mut buffer: Vec<u8> = vec![0; size];
buffer.copy_from_slice(&mmap[offset..offset + size]);

let msg_len = usize::from_le_bytes(buffer.as_slice().try_into().unwrap());
if msg_len > 0 {
return Some(msg_len + size);
}
}
None
});
}

let mapped_file_queue = MappedFileQueue::open(
msg_index_dir.as_path().to_str().unwrap(), max_file_size)?;
Ok(CommitLog { mapped_file_queue })
}

Expand Down
2 changes: 1 addition & 1 deletion src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn list_topics(State(topic_mgr_state): State<Arc<TopicMgr>>) -> Response<B
#[async_trait]
impl Server for HttpServer {
async fn start(&self, listening: SocketAddr, config: ConfigOptions) {
let msg_store = MessageStore::open(&config).unwrap();
let msg_store = MessageStore::new(&config).unwrap();
let msg_store_state = Arc::new(msg_store);

let topic_mgr = TopicMgr::new(config.topic_store_path.as_str()).unwrap();
Expand Down
16 changes: 9 additions & 7 deletions src/index_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ use crate::error::Result;
pub struct IndexStore {
config: ConfigOptions,
index_map: HashMap<String, HashMap<u32, MessageIndex>>,
index_store_path: String,
}

impl IndexStore {
// Constructor: Open or create a file for index store.
pub fn open(config: ConfigOptions) -> Result<Self> {
Ok(IndexStore { config, index_map: HashMap::new() })
pub fn new(config: ConfigOptions) -> Result<Self> {
let msg_store_path_clone = &config.msg_store_path;
let base_dir = PathBuf::from(msg_store_path_clone);
let index_store_path = base_dir.join("index").as_path().to_str().unwrap().to_string();

Ok(IndexStore { config, index_map: HashMap::new(), index_store_path })
}

pub fn put_msg_index(&mut self, dispatch_msg: &DispatchMessage) -> Result<usize> {
Expand All @@ -31,11 +36,8 @@ impl IndexStore {
let topic_index_map = self.index_map.entry(topic.to_string()).or_insert_with(|| HashMap::new());

topic_index_map.entry(queue_id).or_insert_with(|| {
let msg_store_path_clone = self.config.msg_store_path.clone();
let base_dir = PathBuf::from(msg_store_path_clone);
let index_store_path = base_dir.join("index");
MessageIndex::open(
index_store_path.as_path().to_str().unwrap(),
MessageIndex::new(
self.index_store_path.as_str(),
topic, queue_id, self.config.index_file_size).unwrap()
})
}
Expand Down
33 changes: 21 additions & 12 deletions src/mapped_file_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::{Path, PathBuf};
use memmap2::MmapMut;
use snafu::{location, Location};
use crate::error::Error::InvalidInput;
use crate::error::Result;
Expand All @@ -11,27 +12,35 @@ pub struct MappedFileQueue {
}

impl MappedFileQueue {
pub fn open(store_path: &str, max_file_size: u64) -> Result<Self> {
// load exist mapped files
let mut mapped_files = Vec::new();
let store_path_dir = Path::new(store_path);
pub fn new(store_path: &str, max_file_size: u64) -> Result<Self> {
Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files: Vec::new() })
}

/*
* Recovery from restart or fault, load existed files.
*/
pub fn recovery<Func>(&mut self, reader: Func)
where Func: Fn(&MmapMut, usize) -> Option<usize> {
let store_path_dir = Path::new(&self.store_path);
// TODO only read the several newest file
for entry in store_path_dir.read_dir().unwrap() {
if let Ok(entry) = entry {
let entry_path = entry.path();
if entry_path.is_file() {
let mapped_file_name = entry_path.file_name().unwrap().to_str().unwrap();
let mapped_file_path = entry_path.to_str().unwrap();
let start_offset: usize = mapped_file_name.parse().expect("Error while parse file name");
let mapped_file = MemoryMappedFile::open(
mapped_file_path, start_offset, max_file_size).expect("Error while load mapped file");
let mut mapped_file = MemoryMappedFile::new(
mapped_file_path, start_offset, self.max_file_size).expect("Error while load mapped file");

println!("loaded mapped file: {:?}, offset={}", &entry.path(), start_offset);
mapped_files.push(mapped_file);
mapped_file.read_record(&reader);

println!("loaded mapped file: {:?}, offset={}, max_offset={}", &entry.path(),
mapped_file.get_min_offset(), mapped_file.get_max_offset());
self.mapped_files.push(mapped_file);
}
}
}

Ok(MappedFileQueue { store_path: store_path.to_string(), max_file_size, mapped_files })
}

pub fn get_mapped_files(&self) -> &Vec<MemoryMappedFile> {
Expand All @@ -46,7 +55,7 @@ impl MappedFileQueue {

println!("new memory mapped file: {:?}", &file_path);

let mapped_file = MemoryMappedFile::open(
let mapped_file = MemoryMappedFile::new(
file_path.as_path().to_str().unwrap(), start_offset, self.max_file_size).unwrap();

self.mapped_files.push(mapped_file);
Expand Down Expand Up @@ -114,7 +123,7 @@ mod tests {
pub async fn test_write_read() -> Result<()> {
let dir_path = create_temp_dir("topic_mgr_test");
// Create or open the memory-mapped file.
let mut mapped_file_queue = MappedFileQueue::open(
let mut mapped_file_queue = MappedFileQueue::new(
dir_path.path().to_str().unwrap(), 20)?;

let test_str = "hello world".as_bytes();
Expand Down
24 changes: 22 additions & 2 deletions src/mmap_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct MemoryMappedFile {

impl MemoryMappedFile {
// Constructor: Open or create a memory-mapped file.
pub fn open(file_path: &str, start_offset: usize, file_size: u64) -> Result<Self> {
pub fn new(file_path: &str, start_offset: usize, file_size: u64) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -34,6 +34,25 @@ impl MemoryMappedFile {
self.max_offset
}

pub fn set_max_offset(&mut self, max_offset: usize) {
self.max_offset = max_offset;
}

pub fn read_record<Func>(&mut self, reader: &Func)
where Func: Fn(&MmapMut, usize) -> Option<usize> {
let mut write_pos = 0;
loop {
let read_result = reader(&self.mmap, write_pos);
match read_result {
None => { break; }
Some(offset) => {
write_pos += offset;
self.max_offset += offset;
}
}
}
}

// Write data to the memory-mapped file.
pub fn append(&mut self, data: &Vec<u8>) -> Result<usize> {
let data_len = data.len();
Expand Down Expand Up @@ -63,6 +82,7 @@ impl MemoryMappedFile {
let mut buffer = vec![0; data_size];
let read_pos = offset - self.min_offset;

// TODO mmap.len should be max_offset?
// Ensure the buffer size matches the mapped region.
if read_pos + data_size < self.mmap.len() {
buffer.copy_from_slice(&self.mmap[read_pos..read_pos + data_size]);
Expand Down Expand Up @@ -93,7 +113,7 @@ mod tests {
let file_size = 1024;
let file_path = dir_path.path().join("temp_mmap_file");
// Create or open the memory-mapped file.
let mut mem_mapped_file = MemoryMappedFile::open(
let mut mem_mapped_file = MemoryMappedFile::new(
file_path.to_str().unwrap(), 0, file_size)?;

// Write data to the memory-mapped file.
Expand Down
27 changes: 25 additions & 2 deletions src/msg_index.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;
use std::{fs, u32, u64, usize};
use memmap2::MmapMut;
use snafu::ResultExt;
use crate::error::{Result, StdIOSnafu};
use crate::mapped_file_queue::MappedFileQueue;
Expand All @@ -17,15 +18,37 @@ pub struct MessageIndexUnit {

impl MessageIndex {
// Constructor: Open or create a file for message index.
pub fn open(store_path: &str, topic: &str, queue_id: u32, max_file_size: u64) -> Result<Self> {
pub fn new(store_path: &str, topic: &str, queue_id: u32, max_file_size: u64) -> Result<Self> {
let base_dir = PathBuf::from(store_path);
let msg_index_dir = base_dir.join(topic).join(queue_id.to_string());

fs::create_dir_all(&msg_index_dir).context(StdIOSnafu)?;

let mapped_file_queue = MappedFileQueue::open(
let read_dir = msg_index_dir.read_dir().unwrap();
let file_num = read_dir.count();

let mut mapped_file_queue = MappedFileQueue::new(
msg_index_dir.as_path().to_str().unwrap(), max_file_size).unwrap();

if file_num != 0 {
mapped_file_queue.recovery(|mmap: &MmapMut, offset: usize| {
// TODO size of struct?
if offset + MSG_INDEX_UNIT_SIZE < mmap.len() {
let mut buffer: Vec<u8> = vec![0; MSG_INDEX_UNIT_SIZE - 8];
buffer.copy_from_slice(&mmap[offset + 8..offset + MSG_INDEX_UNIT_SIZE]);
let msg_unit_slice = buffer.as_slice();

let size_bytes: [u8; 4] = msg_unit_slice[0..4].try_into().unwrap();
let size = u32::from_le_bytes(size_bytes);

if size > 0 {
return Some(offset + MSG_INDEX_UNIT_SIZE);
}
}
None
});
}

Ok(MessageIndex { mapped_file_queue })
}

Expand Down
21 changes: 15 additions & 6 deletions src/msg_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,33 @@ pub struct MessageStore {

impl MessageStore {
// Constructor: Open or create a file for message store.
pub fn open(config: &ConfigOptions) -> Result<Self> {
let commit_log = Arc::new(Mutex::new(CommitLog::open(
pub fn new(config: &ConfigOptions) -> Result<Self> {
let commit_log = Arc::new(Mutex::new(CommitLog::new(
config.msg_store_path.as_str(), config.msg_store_file_size)?));
let config_clone = config.clone();
let index_store = Arc::new(Mutex::new(IndexStore::open(config_clone)?));
let index_store = Arc::new(Mutex::new(IndexStore::new(config_clone)?));

Ok(MessageStore { commit_log, index_store })
}

pub async fn write_msg(&self, msg: Message) -> Result<usize> {
// write the msg
let mut commit_log = self.commit_log.lock().unwrap();

// TODO should write the message content field by field
let encoded_msg = msg.encode()?;
let msg_offset = commit_log.write_records(&encoded_msg)?;
let msg_len = encoded_msg.len();
let mut msg_len_bytes = usize::to_le_bytes(msg_len).to_vec();
msg_len_bytes.extend(encoded_msg);

let msg_offset = commit_log.write_records(&msg_len_bytes)?;

let mut index_store = self.index_store.lock().unwrap();
let dispatch_msg = DispatchMessage {
topic: msg.topic.clone(),
queue_id: msg.queue_id,
msg_offset,
msg_size: encoded_msg.len(),
msg_size: msg_len + msg_len_bytes.len(),
timestamp: msg.timestamp,
};

Expand All @@ -52,8 +58,11 @@ impl MessageStore {
match index_query_result {
Ok(msg_index_unit) => {
let msg_content = commit_log.read_records(&msg_index_unit)?;

let msg_len_size = std::mem::size_of::<usize>();

let mut result_vec = Vec::new();
let msg = Message::decode(msg_content.as_slice())?;
let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?;

result_vec.push(msg);

Expand Down

0 comments on commit e373aa8

Please sign in to comment.