Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

115-offloadable-bloom-filter #121

Merged
merged 57 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
224f401
Restructuring: separate index with it's disk state and index subtree …
Justarone Mar 28, 2021
07fad73
Merge master into 'b+-tree-on-disk-index'
Justarone Mar 28, 2021
2dec254
Fix minor issues
Justarone Mar 28, 2021
aaf13c7
B+ tree work in progress
Justarone Mar 31, 2021
5bc2daa
Now it works with trees with height higher than 1
Justarone Apr 4, 2021
f3c3767
Add one node case processing
Justarone Apr 4, 2021
06764c5
Fix bug with pick of first elem in node
Justarone Apr 4, 2021
8784e33
Remove panic in bptree module
Justarone Apr 4, 2021
c18683a
Write test, restructure and find bug
Justarone Apr 4, 2021
7d4de97
Fix bug and rewrite bad tests
Justarone Apr 4, 2021
ddc86fe
Fix namings and warning
Justarone Apr 4, 2021
3c6906b
Fix last recs bug
Justarone Apr 4, 2021
c846b0d
Fix review issues
Justarone Apr 5, 2021
ea0844a
Merge master
Justarone Apr 5, 2021
6fde0e4
Implement builder pattern for bptree serialize process
Justarone Apr 6, 2021
96f6626
Change serializer's dynamic check on static one
Justarone Apr 10, 2021
5cd2199
IndexStruct doesn't use IndexHeader now
Justarone Apr 14, 2021
2a517bb
Fix duplicated code
Justarone Apr 14, 2021
c637870
Make file index independent from filter
Justarone Apr 14, 2021
e156056
Start generalization (serializer is generalized)
Justarone Apr 18, 2021
f4fa7e2
Remove redundant bound
Justarone Apr 18, 2021
47443b4
Revert "Remove redundant bound"
Justarone Apr 28, 2021
3487f15
Revert "Start generalization (serializer is generalized)" (there
Justarone Apr 28, 2021
b907134
Fix review issues
Justarone Apr 28, 2021
4645abc
Merge branch 'master' into b+-tree-on-disk-index
Justarone Apr 28, 2021
9cc305c
Add benchmarks for indices
Justarone May 5, 2021
ae6304f
Change benchmarks params
Justarone May 6, 2021
7b8c7ce
Fix error when last node gets only 1 key
Justarone May 8, 2021
c2007dc
Rewrite elems distribution per layer logic
Justarone May 9, 2021
27158db
Add root node in serialized form in RAM
Justarone May 11, 2021
8c82bab
Add search in serialized node (seems like deserialization is expensive)
Justarone May 11, 2021
2bd77b0
Remove deserialization from leaf nodes (that's also expensive)
Justarone May 12, 2021
e47b9d9
Remove vector creation operation and change distribution strategy a b…
Justarone May 15, 2021
24e9211
Change keys distribution in leaf node
Justarone May 15, 2021
83dc3de
Ordered headers are used as leaves
Justarone May 19, 2021
3c4e5d5
Remove redundant read in file on the left side of leaf node and push …
Justarone May 19, 2021
bdecc68
Make get_any return the latest header instead of first one (to enable…
Justarone May 19, 2021
1c2b1cb
Reverse tree in file and move headers after tree (now during search b…
Justarone May 23, 2021
c4cc6d9
Revert "Make get_any return the latest header instead of first one (t…
Justarone May 25, 2021
80c3a0a
Merge branch 'b+-tree-headers-as-leaves' into b+-tree-on-disk-index
Justarone May 25, 2021
ea2c3df
Remove leaves stage because now it's redundant (headers are used as l…
Justarone May 25, 2021
18d61c2
Fix description of b+-tree indices
Justarone May 25, 2021
f0aff14
Bloom filter offload
vovac12 Aug 12, 2021
98f339d
Fix
vovac12 Aug 13, 2021
43fd3fa
Shorter default impl
vovac12 Aug 15, 2021
da1481c
Platform agnosting bloom filter buffer
vovac12 Aug 16, 2021
c12d590
Add BloomDataProvider trait
vovac12 Aug 17, 2021
1267830
Add method to get allocated memory
vovac12 Aug 17, 2021
02b852e
Merge commit 'ca21f33604bb852861d0aeb75f0203bba481761a' of github.com…
vovac12 Nov 1, 2021
c83ca1a
Merge branch 'master' of github.com:qoollo/pearl into 115-offloadable…
vovac12 Nov 1, 2021
0745db3
Fix errors and add unit test
vovac12 Nov 2, 2021
996ad02
Update CHANGELOG.md
vovac12 Nov 2, 2021
d864aef
Fix review issues
vovac12 Nov 3, 2021
0af2ff6
Fix review issues
vovac12 Nov 8, 2021
84f2c20
Fix review issues
vovac12 Nov 9, 2021
a032ed6
Merge branch 'master' into 115-offloadable-bloom-filter
piakushin Nov 9, 2021
9c2bca7
Merge branch '115-offloadable-bloom-filter' of github.com:qoollo/pear…
vovac12 Nov 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Pearl changelog

## [Unreleased]
#### Added

- Add method to offload bloom filters (#121)

#### Changed
- Dump blob indices in separate thread on active blob close (#136) [https://github.com/qoollo/pearl/pull/137]
Expand Down
27 changes: 22 additions & 5 deletions src/blob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Blob {
check_filters: bool,
) -> Result<Option<Entry>> {
debug!("blob get any entry {:?}, {:?}", key, meta);
if check_filters && !self.check_filters(key) {
if check_filters && !self.check_filters(key).await? {
debug!("Key was filtered out by filters");
Ok(None)
} else if let Some(meta) = meta {
Expand Down Expand Up @@ -336,22 +336,39 @@ impl Blob {
self.name.id
}

pub(crate) fn check_filters(&self, key: &[u8]) -> bool {
pub(crate) fn offload_filter(&mut self) {
self.index.offload_filter()
}

pub(crate) async fn check_filters(&self, key: &[u8]) -> Result<bool> {
trace!("check filters (range and bloom)");
if let FilterResult::NotContains = self.index.check_filters_key(key) {
if let FilterResult::NotContains = self.index.check_filters_key(key).await? {
Ok(false)
} else {
Ok(true)
}
}

pub(crate) fn check_filters_in_memory(&self, key: &[u8]) -> bool {
trace!("check filters (range and bloom)");
if let FilterResult::NotContains = self.index.check_filters_in_memory(key) {
false
} else {
true
}
}

pub(crate) async fn check_filters_non_blocking(&self, key: &[u8]) -> bool {
self.check_filters(key)
pub(crate) fn is_filter_offloaded(&self) -> bool {
self.index.is_filter_offloaded()
}

pub(crate) fn index_memory(&self) -> usize {
self.index.memory_used()
}

pub(crate) fn filter_memory_allocated(&self) -> usize {
self.index.bloom_memory_allocated()
}
}

#[derive(Debug, Clone)]
Expand Down
168 changes: 132 additions & 36 deletions src/blob/index/bloom.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
use super::prelude::*;
use bitvec::order::Lsb0;

// All usizes in structures are serialized as u64 in binary
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub(crate) struct Bloom {
inner: BitVec,
inner: Option<BitVec<Lsb0, u64>>,
offset_in_file: Option<u64>,
bits_count: usize,
hashers: Vec<AHasher>,
config: Config,
}

impl Default for Bloom {
fn default() -> Self {
Self {
inner: Some(Default::default()),
bits_count: 0,
hashers: vec![],
config: Default::default(),
offset_in_file: None,
}
}
}

/// Bloom filter configuration parameters.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand All @@ -30,7 +45,7 @@ pub struct Config {
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Save {
config: Config,
buf: Vec<usize>,
buf: Vec<u64>,
bits_count: usize,
}

Expand Down Expand Up @@ -92,14 +107,30 @@ impl Bloom {
pub fn new(config: Config) -> Self {
let bits_count = bits_count_from_formula(&config);
Self {
inner: bitvec![0; bits_count],
inner: Some(bitvec![Lsb0, u64; 0; bits_count]),
hashers: Self::hashers(config.hashers_count),
config,
bits_count,
offset_in_file: None,
}
}

pub fn clear(&mut self) {
self.inner = bitvec![0; self.inner.len()];
self.inner = Some(bitvec![Lsb0, u64; 0; self.bits_count]);
self.offset_in_file = None;
}

pub fn is_offloaded(&self) -> bool {
self.inner.is_none()
}

pub fn offload_from_memory(&mut self) {
self.inner = None;
}

pub fn set_offset_in_file(&mut self, offset: u64) {
self.offset_in_file =
Some(offset + self.buffer_start_position().expect("Should not fail") as u64);
}

pub fn hashers(k: usize) -> Vec<AHasher> {
Expand All @@ -109,62 +140,127 @@ impl Bloom {
.collect()
}

fn save(&self) -> Save {
Save {
config: self.config.clone(),
buf: self.inner.as_raw_slice().to_vec(),
bits_count: self.inner.len(),
fn save(&self) -> Option<Save> {
if let Some(inner) = &self.inner {
Some(Save {
config: self.config.clone(),
buf: inner.as_raw_slice().to_vec(),
bits_count: inner.len(),
})
} else {
None
}
}

fn from(save: Save) -> Self {
fn from(save: Save, offset_in_file: Option<u64>) -> Self {
let mut inner = BitVec::from_vec(save.buf);
inner.truncate(save.bits_count);
Self {
hashers: Self::hashers(save.config.hashers_count),
config: save.config,
inner,
inner: Some(inner),
bits_count: save.bits_count,
offset_in_file,
}
}

pub fn to_raw(&self) -> Result<Vec<u8>> {
let save = self.save();
let save = self
.save()
.ok_or_else(|| anyhow::anyhow!("Filter buffer offloaded, can't serialize"))?;
bincode::serialize(&save).map_err(Into::into)
}

pub fn from_raw(buf: &[u8]) -> Result<Self> {
pub fn from_raw(buf: &[u8], offset_in_file: Option<u64>) -> Result<Self> {
let save: Save = bincode::deserialize(buf)?;
Ok(Self::from(save))
Ok(Self::from(save, offset_in_file))
}

pub fn add(&mut self, item: impl AsRef<[u8]>) {
pub fn add(&mut self, item: impl AsRef<[u8]>) -> Result<()> {
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
if let Some(inner) = &mut self.inner {
let len = inner.len() as u64;
for h in Self::iter_indices_for_key(&self.hashers, len, item.as_ref()) {
*inner
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
.get_mut(h as usize)
.expect("impossible due to mod by len") = true;
}
Ok(())
} else {
Err(anyhow::anyhow!("Can't add to in-file filter"))
}
}

pub fn contains_in_memory(&self, item: impl AsRef<[u8]>) -> Option<bool> {
if let Some(inner) = &self.inner {
let len = inner.len() as u64;
// Check because .all on empty iterator returns true
if len == 0 {
return Some(false);
}
Some(
Self::iter_indices_for_key(&self.hashers, len, item.as_ref())
.all(|i| *inner.get(i as usize).expect("unreachable")),
)
} else {
None
}
}

// Returns empty iterator on len == 0
fn iter_indices_for_key<'a>(
hashers: &'a Vec<AHasher>,
len: u64,
item: &'a [u8],
) -> impl Iterator<Item = u64> + 'a {
hashers.iter().cloned().filter_map(move |mut hasher| {
hasher.write(item.as_ref());
hasher.finish().checked_rem(len)
})
}

pub async fn contains_in_file<P: BloomDataProvider>(
&self,
provider: &P,
item: impl AsRef<[u8]>,
) -> Result<bool> {
if self.bits_count == 0 {
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
return Ok(false);
}
let mut hashers = self.hashers.clone();
let len = self.inner.len() as u64;
for h in hashers.iter_mut().map(|hasher| {
let start_pos = self
.offset_in_file
.ok_or_else(|| anyhow::anyhow!("Offset should be set for in-file operations"))?;
for index in hashers.iter_mut().map(|hasher| {
hasher.write(item.as_ref());
hasher.finish() % len
hasher.finish() % self.bits_count as u64
}) {
*self
.inner
.get_mut(h as usize)
.expect("impossible due to mod by len") = true;
let pos = start_pos + (index / 8);
let byte = provider.read_byte(pos).await?;

if !byte
.view_bits::<Lsb0>()
.get(index as usize % 8)
.expect("unreachable")
{
return Ok(false);
}
}
Ok(true)
}

pub fn contains(&self, item: impl AsRef<[u8]>) -> bool {
let mut hashers = self.hashers.clone();
let len = self.inner.len() as u64;
if len == 0 {
return false;
}
hashers
.iter_mut()
.map(|hasher| {
hasher.write(item.as_ref());
hasher.finish() % len
})
.all(|i| *self.inner.get(i as usize).expect("unreachable"))
// bincode write len as u64 before Vec elements. sizeof(config) + sizeof(u64)
fn buffer_start_position(&self) -> Result<u64> {
Ok(bincode::serialized_size(&self.config)? + std::mem::size_of::<u64>() as u64)
}

pub fn memory_allocated(&self) -> usize {
self.inner.as_ref().map_or(0, |buf| buf.capacity() / 8)
}
}

#[async_trait::async_trait]
pub(crate) trait BloomDataProvider {
async fn read_byte(&self, index: u64) -> Result<u8>;
}

mod tests {
Expand Down
12 changes: 12 additions & 0 deletions src/blob/index/bptree/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ impl FileIndexTrait for BPTreeFileIndex {
Ok(buf)
}

async fn read_meta_at(&self, i: u64) -> Result<u8> {
trace!("load byte from meta");
if i >= self.header.meta_size as u64 {
return Err(anyhow::anyhow!("read meta out of range"));
}
let mut buf = [0; 1];
self.file
.read_at(&mut buf, self.header.serialized_size()? + i)
.await?;
Ok(buf[0])
}

async fn find_by_key(&self, key: &[u8]) -> Result<Option<Vec<RecordHeader>>> {
let root_offset = self.metadata.tree_offset;
let mut buf = [0u8; BLOCK_SIZE];
Expand Down
Loading