Skip to content

Commit

Permalink
115-offloadable-bloom-filter (#121)
Browse files Browse the repository at this point in the history
* Restructuring: separate index with it's disk state and index subtree restructing

* Fix minor issues

* B+ tree work in progress

* Now it works with trees with height higher than 1

* Add one node case processing

* Fix bug with pick of first elem in node

* Remove panic in bptree module

* Write test, restructure and find bug

* Fix bug and rewrite bad tests

* Fix namings and warning

* Fix last recs bug

* Fix review issues

* Implement builder pattern for bptree serialize process

* Change serializer's dynamic check on static one

* IndexStruct doesn't use IndexHeader now

* Fix duplicated code

* Make file index independent from filter

* Start generalization (serializer is generalized)

* Remove redundant bound

* Revert "Remove redundant bound"

This reverts commit f4fa7e2.

* Revert "Start generalization (serializer is generalized)" (there
is a performance hit in this version, so I decided to postpone it.
This generic implementation may be found and is discussed here:
#101)

This reverts commit e156056.

* Fix review issues

* Add benchmarks for indices

* Change benchmarks params

* Fix error when last node gets only 1 key

* Rewrite elems distribution per layer logic

* Add root node in serialized form in RAM

* Add search in serialized node (seems like deserialization is expensive)

* Remove deserialization from leaf nodes (that's also expensive)

* Remove vector creation operation and change distribution strategy a bit again

* Change keys distribution in leaf node

* Ordered headers are used as leaves

* Remove redundant read in file on the left side of leaf node and push record headers from newest to oldest (reversed order)

* Make get_any return the latest header instead of first one (to enable update option)

* Reverse tree in file and move headers after tree (now during search block are read from left to right only instead of from right to left)

* Revert "Make get_any return the latest header instead of first one (to enable update option)"

This reverts commit bdecc68.

* Remove leaves stage because now it's redundant (headers are used as leaves)

* Fix description of b+-tree indices

* Bloom filter offload

* Fix

* Shorter default impl

* Platform agnosting bloom filter buffer

* Add BloomDataProvider trait

* Add method to get allocated memory

* Fix errors and add unit test

* Update CHANGELOG.md

* Fix review issues

* Fix review issues

* Fix review issues

Co-authored-by: Perestoronin Pavel <[email protected]>
Co-authored-by: Pavel Iakushin <[email protected]>
  • Loading branch information
3 people authored Nov 10, 2021
1 parent f10d727 commit a4e99f2
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 63 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Pearl changelog

## [Unreleased]
#### Added

- Add method to offload bloom filters (#121)

#### Changed
- Dump blob indices in separate thread on active blob close (#136) [https://github.com/qoollo/pearl/pull/137]
Expand Down
27 changes: 22 additions & 5 deletions src/blob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Blob {
check_filters: bool,
) -> Result<Option<Entry>> {
debug!("blob get any entry {:?}, {:?}", key, meta);
if check_filters && !self.check_filters(key) {
if check_filters && !self.check_filters(key).await? {
debug!("Key was filtered out by filters");
Ok(None)
} else if let Some(meta) = meta {
Expand Down Expand Up @@ -336,22 +336,39 @@ impl Blob {
self.name.id
}

pub(crate) fn check_filters(&self, key: &[u8]) -> bool {
pub(crate) fn offload_filter(&mut self) {
self.index.offload_filter()
}

pub(crate) async fn check_filters(&self, key: &[u8]) -> Result<bool> {
trace!("check filters (range and bloom)");
if let FilterResult::NotContains = self.index.check_filters_key(key) {
if let FilterResult::NotContains = self.index.check_filters_key(key).await? {
Ok(false)
} else {
Ok(true)
}
}

pub(crate) fn check_filters_in_memory(&self, key: &[u8]) -> bool {
trace!("check filters (range and bloom)");
if let FilterResult::NotContains = self.index.check_filters_in_memory(key) {
false
} else {
true
}
}

pub(crate) async fn check_filters_non_blocking(&self, key: &[u8]) -> bool {
self.check_filters(key)
pub(crate) fn is_filter_offloaded(&self) -> bool {
self.index.is_filter_offloaded()
}

pub(crate) fn index_memory(&self) -> usize {
self.index.memory_used()
}

pub(crate) fn filter_memory_allocated(&self) -> usize {
self.index.bloom_memory_allocated()
}
}

#[derive(Debug, Clone)]
Expand Down
168 changes: 132 additions & 36 deletions src/blob/index/bloom.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
use super::prelude::*;
use bitvec::order::Lsb0;

// All usizes in structures are serialized as u64 in binary
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub(crate) struct Bloom {
inner: BitVec,
inner: Option<BitVec<Lsb0, u64>>,
offset_in_file: Option<u64>,
bits_count: usize,
hashers: Vec<AHasher>,
config: Config,
}

impl Default for Bloom {
fn default() -> Self {
Self {
inner: Some(Default::default()),
bits_count: 0,
hashers: vec![],
config: Default::default(),
offset_in_file: None,
}
}
}

/// Bloom filter configuration parameters.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand All @@ -30,7 +45,7 @@ pub struct Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Save {
config: Config,
buf: Vec<usize>,
buf: Vec<u64>,
bits_count: usize,
}

Expand Down Expand Up @@ -92,14 +107,30 @@ impl Bloom {
pub fn new(config: Config) -> Self {
let bits_count = bits_count_from_formula(&config);
Self {
inner: bitvec![0; bits_count],
inner: Some(bitvec![Lsb0, u64; 0; bits_count]),
hashers: Self::hashers(config.hashers_count),
config,
bits_count,
offset_in_file: None,
}
}

pub fn clear(&mut self) {
self.inner = bitvec![0; self.inner.len()];
self.inner = Some(bitvec![Lsb0, u64; 0; self.bits_count]);
self.offset_in_file = None;
}

pub fn is_offloaded(&self) -> bool {
self.inner.is_none()
}

pub fn offload_from_memory(&mut self) {
self.inner = None;
}

pub fn set_offset_in_file(&mut self, offset: u64) {
self.offset_in_file =
Some(offset + self.buffer_start_position().expect("Should not fail") as u64);
}

pub fn hashers(k: usize) -> Vec<AHasher> {
Expand All @@ -109,62 +140,127 @@ impl Bloom {
.collect()
}

fn save(&self) -> Save {
Save {
config: self.config.clone(),
buf: self.inner.as_raw_slice().to_vec(),
bits_count: self.inner.len(),
fn save(&self) -> Option<Save> {
if let Some(inner) = &self.inner {
Some(Save {
config: self.config.clone(),
buf: inner.as_raw_slice().to_vec(),
bits_count: inner.len(),
})
} else {
None
}
}

fn from(save: Save) -> Self {
fn from(save: Save, offset_in_file: Option<u64>) -> Self {
let mut inner = BitVec::from_vec(save.buf);
inner.truncate(save.bits_count);
Self {
hashers: Self::hashers(save.config.hashers_count),
config: save.config,
inner,
inner: Some(inner),
bits_count: save.bits_count,
offset_in_file,
}
}

pub fn to_raw(&self) -> Result<Vec<u8>> {
let save = self.save();
let save = self
.save()
.ok_or_else(|| anyhow::anyhow!("Filter buffer offloaded, can't serialize"))?;
bincode::serialize(&save).map_err(Into::into)
}

pub fn from_raw(buf: &[u8]) -> Result<Self> {
pub fn from_raw(buf: &[u8], offset_in_file: Option<u64>) -> Result<Self> {
let save: Save = bincode::deserialize(buf)?;
Ok(Self::from(save))
Ok(Self::from(save, offset_in_file))
}

pub fn add(&mut self, item: impl AsRef<[u8]>) {
pub fn add(&mut self, item: impl AsRef<[u8]>) -> Result<()> {
if let Some(inner) = &mut self.inner {
let len = inner.len() as u64;
for h in Self::iter_indices_for_key(&self.hashers, len, item.as_ref()) {
*inner
.get_mut(h as usize)
.expect("impossible due to mod by len") = true;
}
Ok(())
} else {
Err(anyhow::anyhow!("Can't add to in-file filter"))
}
}

pub fn contains_in_memory(&self, item: impl AsRef<[u8]>) -> Option<bool> {
if let Some(inner) = &self.inner {
let len = inner.len() as u64;
// Check because .all on empty iterator returns true
if len == 0 {
return Some(false);
}
Some(
Self::iter_indices_for_key(&self.hashers, len, item.as_ref())
.all(|i| *inner.get(i as usize).expect("unreachable")),
)
} else {
None
}
}

// Returns empty iterator on len == 0
fn iter_indices_for_key<'a>(
hashers: &'a Vec<AHasher>,
len: u64,
item: &'a [u8],
) -> impl Iterator<Item = u64> + 'a {
hashers.iter().cloned().filter_map(move |mut hasher| {
hasher.write(item.as_ref());
hasher.finish().checked_rem(len)
})
}

pub async fn contains_in_file<P: BloomDataProvider>(
&self,
provider: &P,
item: impl AsRef<[u8]>,
) -> Result<bool> {
if self.bits_count == 0 {
return Ok(false);
}
let mut hashers = self.hashers.clone();
let len = self.inner.len() as u64;
for h in hashers.iter_mut().map(|hasher| {
let start_pos = self
.offset_in_file
.ok_or_else(|| anyhow::anyhow!("Offset should be set for in-file operations"))?;
for index in hashers.iter_mut().map(|hasher| {
hasher.write(item.as_ref());
hasher.finish() % len
hasher.finish() % self.bits_count as u64
}) {
*self
.inner
.get_mut(h as usize)
.expect("impossible due to mod by len") = true;
let pos = start_pos + (index / 8);
let byte = provider.read_byte(pos).await?;

if !byte
.view_bits::<Lsb0>()
.get(index as usize % 8)
.expect("unreachable")
{
return Ok(false);
}
}
Ok(true)
}

pub fn contains(&self, item: impl AsRef<[u8]>) -> bool {
let mut hashers = self.hashers.clone();
let len = self.inner.len() as u64;
if len == 0 {
return false;
}
hashers
.iter_mut()
.map(|hasher| {
hasher.write(item.as_ref());
hasher.finish() % len
})
.all(|i| *self.inner.get(i as usize).expect("unreachable"))
// bincode write len as u64 before Vec elements. sizeof(config) + sizeof(u64)
fn buffer_start_position(&self) -> Result<u64> {
Ok(bincode::serialized_size(&self.config)? + std::mem::size_of::<u64>() as u64)
}

pub fn memory_allocated(&self) -> usize {
self.inner.as_ref().map_or(0, |buf| buf.capacity() / 8)
}
}

#[async_trait::async_trait]
pub(crate) trait BloomDataProvider {
async fn read_byte(&self, index: u64) -> Result<u8>;
}

mod tests {
Expand Down
12 changes: 12 additions & 0 deletions src/blob/index/bptree/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ impl FileIndexTrait for BPTreeFileIndex {
Ok(buf)
}

async fn read_meta_at(&self, i: u64) -> Result<u8> {
trace!("load byte from meta");
if i >= self.header.meta_size as u64 {
return Err(anyhow::anyhow!("read meta out of range"));
}
let mut buf = [0; 1];
self.file
.read_at(&mut buf, self.header.serialized_size()? + i)
.await?;
Ok(buf[0])
}

async fn find_by_key(&self, key: &[u8]) -> Result<Option<Vec<RecordHeader>>> {
let root_offset = self.metadata.tree_offset;
let mut buf = [0u8; BLOCK_SIZE];
Expand Down
Loading

0 comments on commit a4e99f2

Please sign in to comment.