Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into prefer-live
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 13, 2024
2 parents 17280e4 + e657210 commit 8252720
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
34 changes: 23 additions & 11 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum Error {
CorruptedFile,
#[error("Empty write buffer")]
NoWrites,
#[error("All backups have been consumed")]
Done,
}

pub struct Storage {
Expand Down Expand Up @@ -129,17 +131,21 @@ impl Storage {
/// the file after loading. If all the disk data is caught up,
/// swaps current write buffer to current read buffer if there
/// is pending data in memory write buffer.
/// Returns true if all the messages are caught up
pub fn reload_on_eof(&mut self) -> Result<bool, Error> {
/// Returns Error::Done if all the messages are caught up
pub fn reload_on_eof(&mut self) -> Result<(), Error> {
// Don't reload if there is data in current read file
if self.current_read_file.has_remaining() {
return Ok(false);
return Ok(());
}

let Some(persistence) = &mut self.persistence else {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
return Ok(self.current_read_file.is_empty());
if self.current_read_file.is_empty() {
return Err(Error::Done);
}

return Ok(());
};

// Remove read file on completion in destructive-read mode
Expand All @@ -155,7 +161,11 @@ impl Storage {
if persistence.backlog_files.is_empty() {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
return Ok(self.current_read_file.is_empty());
if self.current_read_file.is_empty() {
return Err(Error::Done);
}

return Ok(());
}

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
Expand All @@ -164,7 +174,7 @@ impl Storage {
return Err(e);
}

Ok(false)
Ok(())
}
}

Expand Down Expand Up @@ -379,16 +389,18 @@ impl Persistence {
Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted })
}

/// Load the next persistence file to be read into memory
/// Load the next persistence file to be read into memory, returns Error::Done if there is none left.
fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> {
// Len always > 0 because of above if. Doesn't panic
let id = self.backlog_files.pop_front().unwrap();
let Some(id) = self.backlog_files.pop_front() else {
self.current_read_file_id.take();
return Err(Error::Done);
};
let file_name = format!("backup@{id}");
let mut file = PersistenceFile::new(&self.path, file_name)?;

// Load file into memory and store its id for deleting in the future
file.read(current_read_file)?;
self.current_read_file_id = Some(id);
self.current_read_file_id.replace(id);

Ok(())
}
Expand Down Expand Up @@ -423,7 +435,7 @@ mod test {
let mut publishes = vec![];
for _ in 0..n {
// Done reading all the pending files
if storage.reload_on_eof().unwrap() {
if let Err(super::Error::Done) = storage.reload_on_eof() {
break;
}

Expand Down
21 changes: 9 additions & 12 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,7 @@ impl Storage {
}

// Ensures read-buffer is ready to be read from, exchanges buffers if required, returns true if empty.
pub fn reload_on_eof(&mut self) -> Result<bool, storage::Error> {
if self.live_data_first && self.latest_data.is_some() {
return Ok(false);
}

pub fn reload_on_eof(&mut self) -> Result<(), storage::Error> {
self.inner.reload_on_eof()
}

Expand Down Expand Up @@ -291,13 +287,8 @@ impl StorageHandler {

for (stream, storage) in storages {
match storage.reload_on_eof() {
Ok(true) => {
if self.read_stream.take_if(|s| s == stream).is_some() {
debug!("Done reading from: {}", stream.topic);
}
}
// Reading from a non-empty persisted stream
Ok(false) => {
Ok(_) => {
if self.read_stream.is_none() {
self.read_stream.replace(stream.to_owned());
debug!("Started reading from: {}", stream.topic);
Expand All @@ -312,6 +303,12 @@ impl StorageHandler {

return Some((stream.to_owned(), publish));
}
// All packets read from storage
Err(storage::Error::Done) => {
if self.read_stream.take_if(|s| s == stream).is_some() {
debug!("Done reading from: {}", stream.topic);
}
}
// Reload again on encountering a corrupted file
Err(e) => {
metrics.increment_errors();
Expand Down Expand Up @@ -875,7 +872,7 @@ pub mod tests {
use super::*;

fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish {
if storage.reload_on_eof().unwrap() {
if let Err(storage::Error::Done) = storage.reload_on_eof() {
panic!("No publishes found in storage");
}

Expand Down

0 comments on commit 8252720

Please sign in to comment.