Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offset index read integration #1779

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl<R: Repo, T> Generic<R, T> {

pub fn commits_from(&self, offset: u64) -> Commits<R> {
let offsets = self.segment_offsets_from(offset);
let next_offset = offsets.first().cloned().unwrap_or(offset);
let segments = Segments {
offs: offsets.into_iter(),
repo: self.repo.clone(),
Expand All @@ -178,7 +177,7 @@ impl<R: Repo, T> Generic<R, T> {
Commits {
inner: None,
segments,
last_commit: CommitInfo::Initial { next_offset },
last_commit: CommitInfo::Initial { next_offset: offset },
last_error: None,
}
}
Expand Down Expand Up @@ -468,6 +467,20 @@ impl CommitInfo {
Self::LastSeen { tx_range, .. } => &tx_range.end,
}
}

// If initial offset falls within a commit, adjust it to the commit boundary
Shubham8287 marked this conversation as resolved.
Show resolved Hide resolved
fn adjust_initial_offset(&mut self, commit: &StoredCommit) -> bool {
if let Self::Initial { next_offset } = self {
let last_tx_offset = commit.min_tx_offset + commit.n as u64 - 1;
if *next_offset > last_tx_offset {
return true;
} else {
*next_offset = commit.min_tx_offset;
}
}

false
}
}

pub struct Commits<R: Repo> {
Expand Down Expand Up @@ -496,8 +509,11 @@ impl<R: Repo> Commits<R> {
// interesting.
let prev_error = self.last_error.take();

// Skip entries before the initial commit.
if self.last_commit.adjust_initial_offset(&commit) {
self.next()
// Same offset: ignore if duplicate (same crc), else report a "fork".
if self.last_commit.same_offset_as(&commit) {
} else if self.last_commit.same_offset_as(&commit) {
if !self.last_commit.same_checksum_as(&commit) {
warn!(
"forked: commit={:?} last-error={:?} last-crc={:?}",
Expand Down Expand Up @@ -579,7 +595,20 @@ impl<R: Repo> Iterator for Commits<R> {
None => self.last_error.take().map(Err),
Some(segment) => segment.map_or_else(
|e| Some(Err(e.into())),
|segment| {
|mut segment| {
// Try to use offset index to advance segment to Intial commit
if let CommitInfo::Initial { next_offset } = self.last_commit {
let _ = self
.segments
.repo
.get_offset_index(segment.min_tx_offset)
.map_err(Into::into)
.and_then(|index_file| segment.seek_to_segment(&index_file, next_offset))
.inspect_err(|e| {
warn!("commitlog offset index is not used: {e}");
});
}

self.inner = Some(segment.commits());
self.next()
},
Expand Down
134 changes: 80 additions & 54 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,54 @@ pub struct IndexFileMut<Key: Into<u64> + From<u64>> {
}

impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
pub fn create_index_file(path: &Path, offset: u64, cap: u64) -> io::Result<Self> {
Shubham8287 marked this conversation as resolved.
Show resolved Hide resolved
File::options()
.write(true)
.read(true)
.create_new(true)
.open(offset_index_file_path(path, offset))
.and_then(|file| {
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file) }?;

Ok(IndexFileMut {
inner: mmap,
num_entries: 0,
_marker: PhantomData,
})
})
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("Index file {} already exists", path.display());
Self::open_index_file(path, offset, cap)
} else {
Err(e)
}
})
}

pub fn open_index_file(path: &Path, offset: u64, cap: u64) -> io::Result<Self> {
let file = File::options()
.read(true)
.write(true)
.open(offset_index_file_path(path, offset))?;
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file)? };

let mut me = IndexFileMut {
inner: mmap,
num_entries: 0,
_marker: PhantomData,
};
me.num_entries = me.num_entries().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(me)
}

pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> {
fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into)
}

// Searches for first 0-key, to count number of entries
fn num_entries(&self) -> Result<usize, IndexError> {
for index in 0.. {
Expand Down Expand Up @@ -189,60 +237,38 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
}
}

pub fn create_index_file<Key: Into<u64> + From<u64>>(
path: &Path,
offset: u64,
cap: u64,
) -> io::Result<IndexFileMut<Key>> {
File::options()
.write(true)
.read(true)
.create_new(true)
.open(offset_index_file_path(path, offset))
.and_then(|file| {
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file) }?;

Ok(IndexFileMut {
inner: mmap,
num_entries: 0,
_marker: PhantomData,
})
})
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("Index file {} already exists", path.display());
open_index_file(path, offset, cap)
} else {
Err(e)
}
})
/// A wrapper over [`IndexFileMut`] to provide read-only access to the index file.
pub struct IndexFile<Key: Into<u64> + From<u64>> {
inner: IndexFileMut<Key>,
_marker: PhantomData<Key>,
}

pub fn open_index_file<Key: Into<u64> + From<u64>>(
path: &Path,
offset: u64,
cap: u64,
) -> io::Result<IndexFileMut<Key>> {
let file = File::options()
.read(true)
.write(true)
.open(offset_index_file_path(path, offset))?;
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file)? };

let mut me = IndexFileMut {
inner: mmap,
num_entries: 0,
_marker: PhantomData,
};

me.num_entries = me.num_entries().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(me)
}
impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
pub fn open_index_file(path: &Path, offset: u64) -> io::Result<Self> {
let file = File::options()
.read(true)
.append(true)
.open(offset_index_file_path(path, offset))?;
let mmap = unsafe { MmapMut::map_mut(&file)? };

let mut inner = IndexFileMut {
inner: mmap,
num_entries: 0,
_marker: PhantomData,
};
inner.num_entries = inner
.num_entries()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(Self {
inner,
_marker: PhantomData,
})
}

pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> {
fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into)
pub fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> {
self.inner.key_lookup(key)
}
}

#[cfg(test)]
Expand All @@ -257,7 +283,7 @@ mod tests {
let path = temp_dir.path().to_path_buf();

// Create an index file
let mut index_file: IndexFileMut<u64> = create_index_file(&path, 0, cap)?;
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&path, 0, cap)?;

// Enter even number keys from 2
for i in 1..fill_till {
Expand Down Expand Up @@ -341,7 +367,7 @@ mod tests {
let path = temp_dir.path().to_path_buf();

// Create an index file
let mut index_file: IndexFileMut<u64> = create_index_file(&path, 0, 100)?;
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&path, 0, 100)?;

for i in 1..10 {
index_file.append(i * 2, i * 2 * 100)?;
Expand All @@ -350,7 +376,7 @@ mod tests {
assert_eq!(index_file.num_entries, 9);
drop(index_file);

let open_index_file: IndexFileMut<u64> = open_index_file(&path, 0, 100)?;
let open_index_file: IndexFileMut<u64> = IndexFileMut::open_index_file(&path, 0, 100)?;
assert_eq!(open_index_file.num_entries, 9);
assert_eq!(open_index_file.key_lookup(6)?, (6, 600));

Expand Down
4 changes: 1 addition & 3 deletions crates/commitlog/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::io;
use thiserror::Error;
mod indexfile;

pub use indexfile::create_index_file;
pub use indexfile::delete_index_file;
pub use indexfile::offset_index_file_path;
pub use indexfile::IndexFileMut;
pub use indexfile::{IndexFile, IndexFileMut};

#[derive(Error, Debug)]
pub enum IndexError {
Expand Down
14 changes: 9 additions & 5 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{

use log::{debug, warn};

use crate::index::{create_index_file, delete_index_file, offset_index_file_path};
use crate::index::offset_index_file_path;

use super::{Repo, TxOffset, TxOffsetIndex};
use super::{Repo, TxOffset, TxOffsetIndex, TxOffsetIndexMut};

const SEGMENT_FILE_EXT: &str = ".stdb.log";

Expand Down Expand Up @@ -128,11 +128,15 @@ impl Repo for Fs {
Ok(segments)
}

fn get_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndex> {
create_index_file(&self.root, offset, cap)
fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result<TxOffsetIndexMut> {
TxOffsetIndexMut::create_index_file(&self.root, offset, cap)
}

fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> {
delete_index_file(&self.root, offset)
TxOffsetIndexMut::delete_index_file(&self.root, offset)
}

fn get_offset_index(&self, offset: TxOffset) -> io::Result<TxOffsetIndex> {
TxOffsetIndex::open_index_file(&self.root, offset)
}
}
26 changes: 16 additions & 10 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use log::{debug, warn};
use crate::{
commit::Commit,
error,
index::IndexFileMut,
index::{IndexFile, IndexFileMut},
segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer},
Options,
};
Expand All @@ -19,15 +19,16 @@ pub use fs::Fs;
pub use mem::Memory;

pub type TxOffset = u64;
pub type TxOffsetIndex = IndexFileMut<TxOffset>;
pub type TxOffsetIndexMut = IndexFileMut<TxOffset>;
pub type TxOffsetIndex = IndexFile<TxOffset>;

/// A repository of log segments.
///
/// This is mainly an internal trait to allow testing against an in-memory
/// representation.
pub trait Repo: Clone {
/// The type of log segments managed by this repo, which must behave like a file.
type Segment: io::Read + io::Write + FileLike;
type Segment: io::Read + io::Write + FileLike + io::Seek;

/// Create a new segment with the minimum transaction offset `offset`.
///
Expand Down Expand Up @@ -58,24 +59,29 @@ pub trait Repo: Clone {
/// offsets, sorted in ascending order.
fn existing_offsets(&self) -> io::Result<Vec<u64>>;

/// Create or get an existing `TxOffsetIndex` for the given `offset`.
/// Create [`TxOffsetIndexMut`] for the given `offset` or open it if already exist.
/// The `cap` parameter is the maximum number of entries in the index.
fn get_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result<TxOffsetIndex> {
fn create_offset_index(&self, _offset: TxOffset, _cap: u64) -> io::Result<TxOffsetIndexMut> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}

/// Remove `TxOffsetIndex` named with `offset`.
/// Remove [`TxOffsetIndexMut`] named with `offset`.
fn remove_offset_index(&self, _offset: TxOffset) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}

/// Get [`TxOffsetIndex`] for the given `offset`.
fn get_offset_index(&self, _offset: TxOffset) -> io::Result<TxOffsetIndex> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
}

fn offset_index_len(opts: Options) -> u64 {
opts.max_segment_size / opts.offset_index_interval_bytes
}

fn get_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
repo.get_offset_index(offset, offset_index_len(opts))
fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
repo.create_offset_index(offset, offset_index_len(opts))
.map(|index| OffsetIndexWriter::new(index, opts))
.map_err(|e| {
warn!("failed to get offset index for segment {offset}: {e}");
Expand Down Expand Up @@ -111,7 +117,7 @@ pub fn create_segment_writer<R: Repo>(repo: &R, opts: Options, offset: u64) -> i

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: get_offset_index_writer(repo, offset, opts),
offset_index_head: create_offset_index_writer(repo, offset, opts),
})
}

Expand Down Expand Up @@ -166,7 +172,7 @@ pub fn resume_segment_writer<R: Repo>(

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: get_offset_index_writer(repo, offset, opts),
offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
}

Expand Down
Loading
Loading