From 4b8e97ff13d2505dda03115bb94d833d611f538b Mon Sep 17 00:00:00 2001 From: Zhihui Jiao Date: Thu, 16 Nov 2023 23:28:00 +0800 Subject: [PATCH] Client can consume multiple messages in a request --- src/http_server.rs | 11 ++++------- src/index_store.rs | 13 +++++++++++-- src/mapped_file_queue.rs | 4 ---- src/message.rs | 10 ++++++++-- src/mmap_file.rs | 4 ---- src/msg_store.rs | 29 ++++++++++++----------------- 6 files changed, 35 insertions(+), 36 deletions(-) diff --git a/src/http_server.rs b/src/http_server.rs index 915d129..cba354b 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -8,7 +8,7 @@ use serde_json::Value; use crate::config::ConfigOptions; use crate::server::Server; -use crate::message::Message; +use crate::message::{ConsumeMessageRequest, Message}; use crate::msg_store::MessageStore; use crate::topic_mgr::{Topic, TopicMgr}; @@ -33,17 +33,14 @@ async fn produce_message(State(msg_store_state): State>, #[debug_handler] async fn consume_message(State(msg_store_state): State>, - Json(consume_msg): Json) -> Response { + Json(consume_msg): Json) -> Response { println!("consume message: {:?}", &consume_msg); let read_result = msg_store_state.read_msg(consume_msg).await; match read_result { Ok(msg_list) => { - let msg0 = msg_list.get(0).unwrap(); - let msg_json = serde_json::to_string(&msg0).unwrap(); - - let consumed_msg = format!("{:?}", msg_json); - Response::new(Body::from(consumed_msg)) + let msg_json = serde_json::to_string(&msg_list).unwrap(); + Response::new(Body::from(msg_json)) } Err(error) => { let err_msg = format!("consume message error: {:?}", error); diff --git a/src/index_store.rs b/src/index_store.rs index cd99229..79e1ace 100644 --- a/src/index_store.rs +++ b/src/index_store.rs @@ -27,9 +27,18 @@ impl IndexStore { msg_index.put_msg_index(dispatch_msg.msg_offset, dispatch_msg.msg_size) } - pub fn read_msg_index(&mut self, topic: &str, queue_id: u32, index_offset: usize) -> Result { + pub fn read_msg_index(&mut self, topic: &str, queue_id: u32, + index_offset: usize, max_msg_count: usize) -> Vec { let msg_index = self.find_or_create_index(topic, queue_id); - msg_index.read_msg_index(index_offset) + + let mut index_list = Vec::new(); + for index in index_offset..index_offset + max_msg_count { + if let Ok(index_unit) = msg_index.read_msg_index(index) { + index_list.push(index_unit); + } + } + + index_list } fn find_or_create_index(&mut self, topic: &str, queue_id: u32) -> &mut MessageIndex { diff --git a/src/mapped_file_queue.rs b/src/mapped_file_queue.rs index ed4e63c..8803349 100644 --- a/src/mapped_file_queue.rs +++ b/src/mapped_file_queue.rs @@ -43,10 +43,6 @@ impl MappedFileQueue { } } - pub fn get_mapped_files(&self) -> &Vec { - &self.mapped_files - } - pub fn create_mapped_file(&mut self, start_offset: usize) -> &mut MemoryMappedFile { let store_path_clone = self.store_path.clone(); let base_dir = PathBuf::from(store_path_clone); diff --git a/src/message.rs b/src/message.rs index aafdbef..edf37a4 100644 --- a/src/message.rs +++ b/src/message.rs @@ -9,11 +9,18 @@ pub struct Message { pub queue_id: u32, pub timestamp: u64, pub payload: Option, - pub offset: Option, pub key: Option, pub header: Option>, } +#[derive(Debug, Serialize, Deserialize)] +pub struct ConsumeMessageRequest { + pub topic: String, + pub queue_id: u32, + pub offset: usize, + pub max_msg_count: usize, +} + #[derive(Debug, Serialize, Deserialize)] pub struct DispatchMessage { pub topic: String, @@ -50,7 +57,6 @@ mod tests { key: Some("message_key".to_string()), timestamp: 1631894400, payload: Some(payload), - offset: None, header: None, }; diff --git a/src/mmap_file.rs b/src/mmap_file.rs index 921dd71..352e6dc 100644 --- a/src/mmap_file.rs +++ b/src/mmap_file.rs @@ -34,10 +34,6 @@ impl MemoryMappedFile { self.max_offset } - pub fn set_max_offset(&mut self, max_offset: usize) { - self.max_offset = max_offset; - } - pub fn read_record(&mut self, reader: &Func) where Func: Fn(&MmapMut, usize) -> Option { let mut write_pos = 0; diff --git a/src/msg_store.rs b/src/msg_store.rs index 598994f..0702295 100644 --- a/src/msg_store.rs +++ b/src/msg_store.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex}; use crate::commit_log::CommitLog; use crate::config::ConfigOptions; use crate::index_store::IndexStore; -use crate::message::{DispatchMessage, Message}; +use crate::message::{ConsumeMessageRequest, DispatchMessage, Message}; use crate::error::Result; pub struct MessageStore { @@ -46,31 +46,26 @@ impl MessageStore { index_store.put_msg_index(&dispatch_msg) } - pub async fn read_msg(&self, consume_msg: Message) -> Result> { + pub async fn read_msg(&self, consume_msg: ConsumeMessageRequest) -> Result> { let commit_log = self.commit_log.lock().unwrap(); let mut index_store = self.index_store.lock().unwrap(); let index_query_result = index_store.read_msg_index( consume_msg.topic.as_str(), consume_msg.queue_id, - consume_msg.offset.unwrap()); + consume_msg.offset, + consume_msg.max_msg_count); - match index_query_result { - Ok(msg_index_unit) => { - let msg_content = commit_log.read_records(&msg_index_unit)?; + let mut result_msg_list = Vec::new(); - let msg_len_size = std::mem::size_of::(); + for msg_index_unit in index_query_result { + let msg_content = commit_log.read_records(&msg_index_unit)?; + let msg_len_size = std::mem::size_of::(); + let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?; - let mut result_vec = Vec::new(); - let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?; - - result_vec.push(msg); - - Ok(result_vec) - } - Err(err) => { - Err(err) - } + result_msg_list.push(msg); } + + Ok(result_msg_list) } } \ No newline at end of file