Skip to content

Commit

Permalink
Client can consume multiple messages in a request
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihuij committed Nov 16, 2023
1 parent 1039264 commit 4b8e97f
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 36 deletions.
11 changes: 4 additions & 7 deletions src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde_json::Value;
use crate::config::ConfigOptions;

use crate::server::Server;
use crate::message::Message;
use crate::message::{ConsumeMessageRequest, Message};
use crate::msg_store::MessageStore;
use crate::topic_mgr::{Topic, TopicMgr};

Expand All @@ -33,17 +33,14 @@ async fn produce_message(State(msg_store_state): State<Arc<MessageStore>>,

#[debug_handler]
async fn consume_message(State(msg_store_state): State<Arc<MessageStore>>,
Json(consume_msg): Json<Message>) -> Response<Body> {
Json(consume_msg): Json<ConsumeMessageRequest>) -> Response<Body> {
println!("consume message: {:?}", &consume_msg);

let read_result = msg_store_state.read_msg(consume_msg).await;
match read_result {
Ok(msg_list) => {
let msg0 = msg_list.get(0).unwrap();
let msg_json = serde_json::to_string(&msg0).unwrap();

let consumed_msg = format!("{:?}", msg_json);
Response::new(Body::from(consumed_msg))
let msg_json = serde_json::to_string(&msg_list).unwrap();
Response::new(Body::from(msg_json))
}
Err(error) => {
let err_msg = format!("consume message error: {:?}", error);
Expand Down
13 changes: 11 additions & 2 deletions src/index_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@ impl IndexStore {
msg_index.put_msg_index(dispatch_msg.msg_offset, dispatch_msg.msg_size)
}

pub fn read_msg_index(&mut self, topic: &str, queue_id: u32, index_offset: usize) -> Result<MessageIndexUnit> {
pub fn read_msg_index(&mut self, topic: &str, queue_id: u32,
index_offset: usize, max_msg_count: usize) -> Vec<MessageIndexUnit> {
let msg_index = self.find_or_create_index(topic, queue_id);
msg_index.read_msg_index(index_offset)

let mut index_list = Vec::new();
for index in index_offset..index_offset + max_msg_count {
if let Ok(index_unit) = msg_index.read_msg_index(index) {
index_list.push(index_unit);
}
}

index_list
}

fn find_or_create_index(&mut self, topic: &str, queue_id: u32) -> &mut MessageIndex {
Expand Down
4 changes: 0 additions & 4 deletions src/mapped_file_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ impl MappedFileQueue {
}
}

pub fn get_mapped_files(&self) -> &Vec<MemoryMappedFile> {
&self.mapped_files
}

pub fn create_mapped_file(&mut self, start_offset: usize) -> &mut MemoryMappedFile {
let store_path_clone = self.store_path.clone();
let base_dir = PathBuf::from(store_path_clone);
Expand Down
10 changes: 8 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ pub struct Message {
pub queue_id: u32,
pub timestamp: u64,
pub payload: Option<String>,
pub offset: Option<usize>,
pub key: Option<String>,
pub header: Option<HashMap<String, String>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ConsumeMessageRequest {
pub topic: String,
pub queue_id: u32,
pub offset: usize,
pub max_msg_count: usize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct DispatchMessage {
pub topic: String,
Expand Down Expand Up @@ -50,7 +57,6 @@ mod tests {
key: Some("message_key".to_string()),
timestamp: 1631894400,
payload: Some(payload),
offset: None,
header: None,
};

Expand Down
4 changes: 0 additions & 4 deletions src/mmap_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ impl MemoryMappedFile {
self.max_offset
}

pub fn set_max_offset(&mut self, max_offset: usize) {
self.max_offset = max_offset;
}

pub fn read_record<Func>(&mut self, reader: &Func)
where Func: Fn(&MmapMut, usize) -> Option<usize> {
let mut write_pos = 0;
Expand Down
29 changes: 12 additions & 17 deletions src/msg_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use crate::commit_log::CommitLog;
use crate::config::ConfigOptions;
use crate::index_store::IndexStore;
use crate::message::{DispatchMessage, Message};
use crate::message::{ConsumeMessageRequest, DispatchMessage, Message};
use crate::error::Result;

pub struct MessageStore {
Expand Down Expand Up @@ -46,31 +46,26 @@ impl MessageStore {
index_store.put_msg_index(&dispatch_msg)
}

pub async fn read_msg(&self, consume_msg: Message) -> Result<Vec<Message>> {
pub async fn read_msg(&self, consume_msg: ConsumeMessageRequest) -> Result<Vec<Message>> {
let commit_log = self.commit_log.lock().unwrap();
let mut index_store = self.index_store.lock().unwrap();

let index_query_result = index_store.read_msg_index(
consume_msg.topic.as_str(),
consume_msg.queue_id,
consume_msg.offset.unwrap());
consume_msg.offset,
consume_msg.max_msg_count);

match index_query_result {
Ok(msg_index_unit) => {
let msg_content = commit_log.read_records(&msg_index_unit)?;
let mut result_msg_list = Vec::new();

let msg_len_size = std::mem::size_of::<usize>();
for msg_index_unit in index_query_result {
let msg_content = commit_log.read_records(&msg_index_unit)?;
let msg_len_size = std::mem::size_of::<usize>();
let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?;

let mut result_vec = Vec::new();
let msg = Message::decode(&msg_content.as_slice()[msg_len_size..])?;

result_vec.push(msg);

Ok(result_vec)
}
Err(err) => {
Err(err)
}
result_msg_list.push(msg);
}

Ok(result_msg_list)
}
}

0 comments on commit 4b8e97f

Please sign in to comment.