diff --git a/crates/core/src/buf/mod.rs b/crates/core/src/buf/mod.rs index a9c8a0a..cb09476 100644 --- a/crates/core/src/buf/mod.rs +++ b/crates/core/src/buf/mod.rs @@ -58,12 +58,12 @@ impl BufferRepr { len, segments, } => { - let range = { - let seg_id = seg_id as u64; - (seg_id * crate::SEG_SIZE)..((seg_id + 1) * crate::SEG_SIZE).min(*len) - }; + let range = Segment::data_range_of_id(seg_id); + let range = range.start..range.end.min(*len); segments - .get_or_insert(seg_id, || Arc::new(Segment::map_file(seg_id, range, file))) + .get_or_insert(seg_id, || { + Arc::new(Segment::map_file(seg_id, range, file).unwrap()) + }) .clone() } BufferRepr::Stream { @@ -228,17 +228,17 @@ where } /// Retrieves a line of text from the buffer based on the given line number. - /// - /// + /// + /// /// # Arguments - /// + /// /// * `line_number` - The line number to retrieve. - /// + /// /// /// # Arguments - /// + /// /// * `line_number` - The line number to retrieve. - /// + /// /// # Panics /// /// This function will panic if the `line_number` is greater than the total number @@ -252,14 +252,14 @@ where let data_start = self.index.data_of_line(line_number).unwrap(); let data_end = self.index.data_of_line(line_number + 1).unwrap(); - let seg_start = (data_start / crate::SEG_SIZE) as usize; - let seg_end = (data_end / crate::SEG_SIZE) as usize; + let seg_start = Segment::id_of_data(data_start); + let seg_end = Segment::id_of_data(data_end); if seg_start == seg_end { // The data is in a single segment let seg = self.repr.fetch(seg_start); - let (start, end) = seg.translate_inner_data_range(data_start, data_end); - seg.get_line(start, end) + let range = seg.translate_inner_data_range(data_start, data_end); + seg.get_line(range) } else { debug_assert!(seg_start < seg_end); // The data may cross several segments, so we must piece together @@ -356,8 +356,8 @@ where let curr_line_data_start = self.index.data_of_line(curr_line).unwrap(); let curr_line_data_end = self.index.data_of_line(curr_line + 1).unwrap(); - let curr_line_seg_start = (curr_line_data_start / crate::SEG_SIZE) as usize; - let curr_line_seg_end = (curr_line_data_end / crate::SEG_SIZE) as usize; + let curr_line_seg_start = Segment::id_of_data(curr_line_data_start); + let curr_line_seg_end = Segment::id_of_data(curr_line_data_end); if curr_line_seg_end != curr_line_seg_start { self.imm_buf.clear(); @@ -380,8 +380,8 @@ where self.line_range.start += 1; return Some((&self.index, curr_line_data_start, &self.imm_buf)); } else { - let curr_seg_data_start = curr_line_seg_start as u64 * crate::SEG_SIZE; - let curr_seg_data_end = curr_seg_data_start + crate::SEG_SIZE; + let curr_seg_data_start = curr_line_seg_start as u64 * Segment::MAX_SIZE; + let curr_seg_data_end = curr_seg_data_start + Segment::MAX_SIZE; let line_end = self .index @@ -391,10 +391,10 @@ where // this line should not cross multiple segments, else we would have caught in the first case let segment = self.repr.fetch(curr_line_seg_start); - let (start, end) = + let range = segment.translate_inner_data_range(curr_line_data_start, line_end_data_start); - assert!(line_end_data_start - curr_seg_data_start <= crate::SEG_SIZE); - assert!(end <= crate::SEG_SIZE); + assert!(line_end_data_start - curr_seg_data_start <= Segment::MAX_SIZE); + assert!(range.end <= Segment::MAX_SIZE); self.line_range.start = line_end; let segment = self.imm_seg.insert(segment); @@ -403,7 +403,7 @@ where return Some(( &self.index, curr_line_data_start, - &segment[start as usize..end as usize], + &segment[range.start as usize..range.end as usize], )); } } diff --git a/crates/core/src/buf/segment.rs b/crates/core/src/buf/segment.rs index d7eb7ed..abfa30e 100644 --- a/crates/core/src/buf/segment.rs +++ b/crates/core/src/buf/segment.rs @@ -1,59 +1,62 @@ +use crate::Result; +use memmap2::{Mmap, MmapMut}; use std::{borrow::Cow, ops::Range, ptr::NonNull, sync::Arc}; -use crate::Mmappable; -pub struct Segment { +#[cfg(unix)] +pub(crate) use std::os::fd::AsRawFd as Mmappable; +#[cfg(windows)] +pub(crate) use std::os::windows::io::AsRawHandle as Mmappable; + +pub struct SegmentRaw { id: usize, - start: u64, - data: memmap2::Mmap, + range: Range, + data: Buf, } -impl Segment { - pub(crate) fn map_file(id: usize, range: Range, file: &F) -> Self { - let data = unsafe { - memmap2::MmapOptions::new() - .offset(range.start) - .len((range.end - range.start) as usize) - .map(file).expect("mmap should succeed") - }; - #[cfg(unix)] - data.advise(memmap2::Advice::WillNeed).ok(); - Self::new(id, range.start, data) - } +pub type SegmentMut = SegmentRaw; +pub type Segment = SegmentRaw; - pub(crate) fn new(id: usize, start: u64, data: memmap2::Mmap) -> Self { - Self { - id, - data, - start, - } - } +impl SegmentRaw +where + Buf: AsRef<[u8]>, +{ + pub const MAX_SIZE: u64 = 1 << 20; pub fn id(&self) -> usize { self.id } - pub fn as_slice(&self) -> &[u8] { - &self + pub fn start(&self) -> u64 { + self.range.start } pub fn translate_inner_data_index(&self, start: u64) -> u64 { - start - self.start + debug_assert!(self.range.start <= start); + // TODO: make this better... i don't like that its <= + // but technically its fine as long as start + // is the end of the buffer + debug_assert!(start <= self.range.end); + start - self.range.start } - pub fn translate_inner_data_range(&self, start: u64, end: u64) -> (u64, u64) { - (self.translate_inner_data_index(start), self.translate_inner_data_index(end)) + pub fn translate_inner_data_range(&self, start: u64, end: u64) -> Range { + self.translate_inner_data_index(start)..self.translate_inner_data_index(end) } - pub fn get_line(self: &Arc, start: u64, end: u64) -> SegStr { - let data = &self.data[start as usize..end as usize]; - // Safety: The length is computed by a (assumed to be correct) - // index. It is undefined behavior if the file changes - // in a non-appending way after the index is created. - SegStr::new(self.clone(), data) + pub fn id_of_data(start: u64) -> usize { + (start / Self::MAX_SIZE) as usize + } + + pub fn data_range_of_id(id: usize) -> Range { + let start = id as u64 * Self::MAX_SIZE; + start..start + Self::MAX_SIZE } } -impl std::ops::Deref for Segment { +impl std::ops::Deref for SegmentRaw +where + Buf: std::ops::Deref, +{ type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -61,6 +64,62 @@ impl std::ops::Deref for Segment { } } +impl std::ops::DerefMut for SegmentRaw +where + Buf: std::ops::DerefMut, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +impl SegmentMut { + pub(crate) fn new(id: usize, start: u64) -> Result { + let data = memmap2::MmapOptions::new() + .len(Self::MAX_SIZE as usize) + .map_anon()?; + #[cfg(unix)] + data.advise(memmap2::Advice::Sequential)?; + Ok(Self { id, data, range: start..start + Self::MAX_SIZE }) + } + + pub fn into_read_only(self) -> Result { + Ok(Segment { + id: self.id, + data: self.data.make_read_only()?, + range: self.range, + }) + } +} + +impl Segment { + pub(crate) fn map_file(id: usize, range: Range, file: &F) -> Result { + let size = range.end - range.start; + debug_assert!(size <= Self::MAX_SIZE); + let data = unsafe { + memmap2::MmapOptions::new() + .offset(range.start) + .len(size as usize) + .map(file)? + }; + #[cfg(unix)] + data.advise(memmap2::Advice::WillNeed)?; + Ok(Self { + id, + data, + range + }) + } + + pub fn get_line(self: &Arc, range: Range) -> SegStr { + let data = &self.data[range.start as usize..range.end as usize]; + // Safety: The length is computed by a (assumed to be correct) + // index. It is undefined behavior if the file changes + // in a non-appending way after the index is created. + SegStr::new(self.clone(), data) + } +} + /// Line string that comes from a [Segment]. /// /// If the [SegStr] borrows from the segment, the segment will not be dropped until @@ -88,7 +147,7 @@ impl SegStr { /// is invalid utf-8, it will be converted into an owned [String] using `String::from_utf8_lossy`. /// /// # Safety - /// + /// /// 1. The provided slice must point to data that lives inside the ref-counted [Segment]. /// 2. The length must encompass a valid range of data inside the [Segment]. fn new<'origin>(origin: Arc, data: &'origin [u8]) -> Self { @@ -113,9 +172,11 @@ impl SegStr { pub fn as_bytes(&self) -> &[u8] { // Safety: We have already checked in the constructor. match &self.0 { - SegStrRepr::Borrowed { _ref: _pin, ptr, len } => unsafe { - std::slice::from_raw_parts(ptr.as_ptr(), *len) - }, + SegStrRepr::Borrowed { + _ref: _pin, + ptr, + len, + } => unsafe { std::slice::from_raw_parts(ptr.as_ptr(), *len) }, SegStrRepr::Owned(s) => s.as_bytes(), } } diff --git a/crates/core/src/index/inflight.rs b/crates/core/src/index/inflight.rs index cd0db1e..39b8390 100644 --- a/crates/core/src/index/inflight.rs +++ b/crates/core/src/index/inflight.rs @@ -2,11 +2,11 @@ //! that allow the use of [IncompleteIndex] functionalities while it is "inflight" //! or in the middle of the indexing operation. -use crate::buf::segment::Segment; - use super::{BufferIndex, CompleteIndex, IncompleteIndex}; - -use crate::err::{Error, Result}; +use crate::{ + buf::segment::{Segment, SegmentMut}, + err::{Error, Result}, +}; use std::sync::mpsc::{Receiver, Sender}; use std::thread::JoinHandle; use std::{ @@ -18,35 +18,20 @@ use std::{ struct IndexingTask { /// This is the sender side of the channel that receives byte indexes of `\n`. sx: Sender, - /// Memmap buffer. - data: memmap2::Mmap, - /// Indicates where the buffer starts within the file. - start: u64, + segment: Segment, } impl IndexingTask { - fn map(file: &T, start: u64, end: u64) -> Result { - let data = unsafe { - memmap2::MmapOptions::new() - .offset(start) - .len((end - start) as usize) - .map(file)? - }; - #[cfg(unix)] - data.advise(memmap2::Advice::Sequential)?; - Ok(data) - } - - fn new(file: &T, start: u64, end: u64) -> Result<(Self, Receiver)> { - let data = Self::map(file, start, end)?; + fn new(file: &File, start: u64, end: u64) -> Result<(Self, Receiver)> { + let segment = Segment::map_file(0, start..end, file)?; let (sx, rx) = std::sync::mpsc::channel(); - Ok((Self { sx, data, start }, rx)) + Ok((Self { sx, segment }, rx)) } fn compute(self) -> Result<()> { - for i in memchr::memchr_iter(b'\n', &self.data) { + for i in memchr::memchr_iter(b'\n', &self.segment) { self.sx - .send(self.start + i as u64 + 1) + .send(self.segment.start() + i as u64 + 1) .map_err(|_| Error::Internal)?; } @@ -117,7 +102,7 @@ impl InflightIndexImpl { let mut curr = 0; while curr < len { - let end = (curr + crate::SEG_SIZE).min(len); + let end = (curr + Segment::MAX_SIZE).min(len); let (task, task_rx) = IndexingTask::new(&file, curr, end)?; sx.send(task_rx).unwrap(); @@ -151,28 +136,24 @@ impl InflightIndexImpl { let mut seg_id = 0; loop { - let mut data = memmap2::MmapOptions::new() - .len(crate::SEG_SIZE as usize) - .map_anon()?; - #[cfg(unix)] - data.advise(memmap2::Advice::Sequential)?; + let mut segment = SegmentMut::new(seg_id, len)?; let mut buf_len = 0; loop { - match stream.read(&mut data[buf_len..crate::SEG_SIZE as usize])? { + match stream.read(&mut segment[buf_len..])? { 0 => break, l => buf_len += l, } } let mut inner = self.inner.lock().unwrap(); - for i in memchr::memchr_iter(b'\n', &data) { + for i in memchr::memchr_iter(b'\n', &segment) { let line_data = len + i as u64; inner.push_line_data(line_data + 1); } outgoing - .send(Segment::new(seg_id, len, data.make_read_only()?)) + .send(segment.into_read_only()?) .map_err(|_| Error::Internal)?; if buf_len == 0 { @@ -268,7 +249,7 @@ impl InflightIndex { /// /// let inflight_index = InflightIndex::new(InflightIndexMode::File); /// ``` - /// + /// /// # Returns /// /// A tuple containing the newly created `InflightIndex` and the associated `InflightIndexRemote`. @@ -277,7 +258,6 @@ impl InflightIndex { (Self::Incomplete(inner.clone()), InflightIndexRemote(inner)) } - /// Creates a new complete index from a file. /// /// This function creates a new complete index from the provided file. It internally diff --git a/crates/core/src/index/mod.rs b/crates/core/src/index/mod.rs index f111a7e..0ad8053 100644 --- a/crates/core/src/index/mod.rs +++ b/crates/core/src/index/mod.rs @@ -4,6 +4,7 @@ pub mod inflight; +use crate::buf::segment::Segment; use crate::cowvec::CowVec; use crate::err::Result; use std::fs::File; @@ -93,11 +94,11 @@ pub struct IncompleteIndex { impl IncompleteIndex { /// Create a new [IncompleteIndex]. - /// - /// This can be used to build a [CompleteIndex] by using the + /// + /// This can be used to build a [CompleteIndex] by using the /// [`IncompleteIndex::index_file()`] method or manually /// using `push_line_data(u64)`, `finalize(u64)` and `finish()`. - /// + /// /// # Example /// ``` /// use bvr_core::index::IncompleteIndex; @@ -111,7 +112,7 @@ impl IncompleteIndex { } /// Index a [File] and return a [CompleteIndex]. - /// + /// /// # Examples /// ``` /// # fn main() -> Result<(), Box> { @@ -128,18 +129,11 @@ impl IncompleteIndex { let mut start = 0; while start < len { - let end = (start + crate::SEG_SIZE).min(len); - - let data = unsafe { - memmap2::MmapOptions::new() - .offset(start) - .len((end - start) as usize) - .map(file)? - }; - #[cfg(unix)] - data.advise(memmap2::Advice::Sequential)?; - - for i in memchr::memchr_iter(b'\n', &data) { + let end = (start + Segment::MAX_SIZE).min(len); + + let segment = Segment::map_file(0, start..end, file)?; + + for i in memchr::memchr_iter(b'\n', &segment) { let line_data = start + i as u64; self.push_line_data(line_data + 1); } @@ -219,4 +213,4 @@ impl BufferIndex for CompleteIndex { None } -} \ No newline at end of file +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index a2ba98c..d8e5476 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -4,13 +4,6 @@ pub mod search; pub mod err; mod cowvec; -#[cfg(unix)] -use std::os::fd::AsRawFd as Mmappable; -#[cfg(windows)] -use std::os::windows::io::AsRawHandle as Mmappable; - -const SEG_SIZE: u64 = 1 << 20; - pub use err::Result; pub use index::inflight::InflightIndex; pub use buf::SegBuffer;