Skip to content

Commit

Permalink
iter
Browse files Browse the repository at this point in the history
  • Loading branch information
HaoranYi committed Nov 8, 2022
1 parent 0770b0f commit 0f8a0fa
Showing 1 changed file with 129 additions and 11 deletions.
140 changes: 129 additions & 11 deletions runtime/src/cache_hash_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,49 @@ impl<const CELL_SIZE: usize, const HEADER_SIZE: usize> CacheHashDataFile<CELL_SI
stats: &mut CacheHashDataStats,
) {
let mut m2 = Measure::start("decode");
let slices = self.get_cache_hash_data();
for d in slices {
let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
assert!(
pubkey_to_bin_index >= start_bin_index,
"{}, {}",
pubkey_to_bin_index,
start_bin_index
); // this would indicate we put a pubkey in too high of a bin
pubkey_to_bin_index -= start_bin_index;
accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here

let mut iter = self.iter();
loop {
if let Some(d) = iter.next() {
let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
assert!(
pubkey_to_bin_index >= start_bin_index,
"{}, {}",
pubkey_to_bin_index,
start_bin_index
); // this would indicate we put a pubkey in too high of a bin
pubkey_to_bin_index -= start_bin_index;
accumulator[pubkey_to_bin_index].push(d.clone());
} else {
break;
}
}

// let slices = self.get_cache_hash_data();
// for d in slices {
// let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
// assert!(
// pubkey_to_bin_index >= start_bin_index,
// "{}, {}",
// pubkey_to_bin_index,
// start_bin_index
// ); // this would indicate we put a pubkey in too high of a bin
// pubkey_to_bin_index -= start_bin_index;
// accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here
// }

m2.stop();
stats.decode_us += m2.as_us();
}

fn get(&self, ix: u64) -> &EntryType {
let item_slice = self.get_slice_internal(ix);
unsafe {
let item = item_slice.as_ptr() as *const EntryType;
&*item
}
}

/// get '&mut EntryType' from cache file [ix]
fn get_mut(&mut self, ix: u64) -> &mut EntryType {
let item_slice = self.get_slice_internal(ix);
Expand Down Expand Up @@ -118,6 +144,20 @@ impl<const CELL_SIZE: usize, const HEADER_SIZE: usize> CacheHashDataFile<CELL_SI
}
}

fn get_header(&self) -> &Header {
let start = 0_usize;
let end = start + HEADER_SIZE;
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *const Header;
&*item
}
}

fn len(&self) -> usize {
self.get_header().count
}

fn new_map(file: &Path, capacity: u64) -> Result<MmapMut, std::io::Error> {
let mut data = OpenOptions::new()
.read(true)
Expand All @@ -144,6 +184,84 @@ impl<const CELL_SIZE: usize, const HEADER_SIZE: usize> CacheHashDataFile<CELL_SI

Ok(unsafe { MmapMut::map_mut(&data).unwrap() })
}

fn iter<'a>(&'a self) -> CacheHashDataIter<'a, CELL_SIZE, HEADER_SIZE> {
CacheHashDataIter::<'a, CELL_SIZE, HEADER_SIZE>::new(self)
}

fn iter_mut<'a>(&'a mut self) -> CacheHashDataIterMut<'a, CELL_SIZE, HEADER_SIZE> {
CacheHashDataIterMut::<'a, CELL_SIZE, HEADER_SIZE>::new(self)
}
}

struct CacheHashDataIter<'a, const CELL_SIZE: usize, const HEADER_SIZE: usize> {
data: &'a CacheHashDataFile<CELL_SIZE, HEADER_SIZE>,
current: usize,
len: usize,
}

impl<'a, const CELL_SIZE: usize, const HEADER_SIZE: usize>
CacheHashDataIter<'a, CELL_SIZE, HEADER_SIZE>
{
pub fn new(data: &'a CacheHashDataFile<CELL_SIZE, HEADER_SIZE>) -> Self {
Self {
data,
current: 0,
len: data.len(),
}
}
}

impl<'a, const CELL_SIZE: usize, const HEADER_SIZE: usize> Iterator
for CacheHashDataIter<'a, CELL_SIZE, HEADER_SIZE>
{
type Item = &'a EntryType;

fn next(&mut self) -> Option<Self::Item> {
if self.current < self.len {
let hash_data = self.data.get(self.current as u64);
self.current += 1;
Some(hash_data)
} else {
None
}
}
}

struct CacheHashDataIterMut<'a, const CELL_SIZE: usize, const HEADER_SIZE: usize> {
data: &'a mut CacheHashDataFile<CELL_SIZE, HEADER_SIZE>,
current: usize,
len: usize,
}

impl<'a, const CELL_SIZE: usize, const HEADER_SIZE: usize>
CacheHashDataIterMut<'a, CELL_SIZE, HEADER_SIZE>
{
pub fn new(data: &'a mut CacheHashDataFile<CELL_SIZE, HEADER_SIZE>) -> Self {
let len = data.len();
Self {
data,
current: 0,
len,
}
}
}

impl<'a> Iterator for CacheHashDataIterMut<'a, CELL_SIZE, HEADER_SIZE> {
type Item = &'a mut EntryType;

fn next(&mut self) -> Option<Self::Item> {
if self.current < self.len {
let item_slice = self.data.get_slice_internal(self.current as u64);
self.current += 1;
unsafe {
let item = item_slice.as_ptr() as *mut EntryType;
Some(&mut *item)
}
} else {
None
}
}
}

pub type PreExistingCacheFiles = HashSet<String>;
Expand Down

0 comments on commit 0f8a0fa

Please sign in to comment.