diff --git a/crates/commitlog/src/commit.rs b/crates/commitlog/src/commit.rs index 243ac729972..b9b26546669 100644 --- a/crates/commitlog/src/commit.rs +++ b/crates/commitlog/src/commit.rs @@ -84,7 +84,9 @@ impl Commit { } /// Serialize and write `self` to `out`. - pub fn write(&self, out: W) -> io::Result<()> { + /// + /// Returns the crc32 checksum of the commit on success. + pub fn write(&self, out: W) -> io::Result { let mut out = Crc32cWriter::new(out); let min_tx_offset = self.min_tx_offset.to_le_bytes(); @@ -100,7 +102,7 @@ impl Commit { let mut out = out.into_inner(); out.write_all(&crc.to_le_bytes())?; - Ok(()) + Ok(crc) } /// Attempt to read one [`Commit`] from the given [`Read`]er. diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index d6d03aa5dbd..4a1c6be2bde 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -12,6 +12,8 @@ use crate::{ Commit, Encode, Options, }; +pub use crate::segment::Committed; + /// A commitlog generic over the storage backend as well as the type of records /// its [`Commit`]s contain. #[derive(Debug)] @@ -87,7 +89,7 @@ impl Generic { /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind /// this means that something is seriously wrong underlying storage, and the /// caller should stop writing to the log. - pub fn commit(&mut self) -> io::Result { + pub fn commit(&mut self) -> io::Result> { self.panicked = true; let writer = &mut self.head; let sz = writer.commit.encoded_len(); @@ -102,15 +104,13 @@ impl Generic { writer }; - let ret = if let Err(e) = writer.commit() { + let ret = writer.commit().or_else(|e| { warn!("Commit failed: {e}"); // Nb.: Don't risk a panic by calling `self.sync()`. // We already gave up on the last commit, and will retry it next time. self.start_new_segment()?; Err(e) - } else { - Ok(sz) - }; + }); self.panicked = false; ret } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 9ad3923ac52..c143983d86d 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -77,6 +77,15 @@ impl Default for Header { } } +/// Metadata about a [`Commit`] which was successfully written via [`Writer::commit`]. +pub struct Committed { + /// The range of transaction offsets included in the commit. + pub tx_range: Range, + /// The crc32 checksum of the commit's serialized form, + /// as written to the commitlog. + pub checksum: u32, +} + #[derive(Debug)] pub struct Writer { pub(crate) commit: Commit, @@ -114,11 +123,15 @@ impl Writer { /// Write the current [`Commit`] to the underlying [`io::Write`]. /// /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero). - pub fn commit(&mut self) -> io::Result<()> { + /// In this case, `None` is returned. + /// + /// Otherwise `Some` [`Committed`] is returned, providing some metadata about + /// the commit. + pub fn commit(&mut self) -> io::Result> { if self.commit.n == 0 { - return Ok(()); + return Ok(None); } - self.commit.write(&mut self.inner)?; + let checksum = self.commit.write(&mut self.inner)?; self.inner.flush()?; let commit_len = self.commit.encoded_len() as u64; @@ -130,12 +143,17 @@ impl Writer { }) }); + let tx_range_start = self.commit.min_tx_offset; + self.bytes_written += commit_len; self.commit.min_tx_offset += self.commit.n as u64; self.commit.n = 0; self.commit.records.clear(); - Ok(()) + Ok(Some(Committed { + tx_range: tx_range_start..self.commit.min_tx_offset, + checksum, + })) } /// The smallest transaction offset in this segment.