Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

231 pass and store timestamp within record #232

Merged
merged 28 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dfc1ee4
[231] Pass and store timestamp within record
ikopylov Jan 14, 2023
1824f70
[231] Update tests
ikopylov Jan 14, 2023
57d2569
[231] Update tests
ikopylov Jan 14, 2023
626d5f2
[231] Fix docs
ikopylov Jan 14, 2023
84c58bd
[231] Keeps ordering by timestamp in storage::read_all
ikopylov Jan 14, 2023
b809b97
[231] Check data equality in tests
ikopylov Jan 14, 2023
d4aeec7
[231] Add ability to attach metadata to deletion record
ikopylov Jan 14, 2023
461ba4a
[231] Fix build
ikopylov Jan 14, 2023
c05616d
[231] Update changelog
ikopylov Jan 14, 2023
a5b9b47
[231] Update comment
ikopylov Jan 14, 2023
b1738a9
Merge branch 'master' into 231-pass-and-store-timestamp-within-record
ikopylov Jan 14, 2023
8e6f04f
Merge branch 'master' into 231-pass-and-store-timestamp-within-record
ikopylov May 17, 2023
0ff13dc
[231] Fix build after merge
ikopylov May 17, 2023
afceb19
[231] Insert position search improvement
ikopylov May 17, 2023
156149c
[231] Add test for ordering by timestamp
ikopylov May 17, 2023
834b974
[231] Extend ordering test with deletion
ikopylov May 17, 2023
50062ce
[231] Update `contains` to preserve ordering
ikopylov May 17, 2023
4a06ab1
[231] get_any -> get_latest
ikopylov May 18, 2023
b1a9a94
[231] Synchronize read and contains implementation on Storage level. …
ikopylov May 18, 2023
727e5f5
[231] Include BLOB path into errors returned from Entry functions
ikopylov May 18, 2023
4b15126
[231] Update Changelog.md
ikopylov May 18, 2023
1479cbb
Merge branch 'master' into 231-pass-and-store-timestamp-within-record
ikopylov Jun 24, 2023
e3cb259
[231] Fix build errors after merge
ikopylov Jun 24, 2023
c8b1e5c
[231] Updates according to the code review
ikopylov Jun 24, 2023
4730a8a
[231] Remove dead code according to code review
ikopylov Aug 14, 2023
adf7566
[231] Remove unused import
ikopylov Aug 14, 2023
f6071b6
Merge branch 'master' into 231-pass-and-store-timestamp-within-record
ikopylov Aug 15, 2023
1291d4f
[231] Remove dead code
ikopylov Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ Pearl changelog

## [Unreleased]
#### Added
- Add ability to attach metadata to the deletion record (#229)

#### Changed
- Receive timestamp as parameter, store it within record and return it with BlobRecordTimestamp (#231)
- `Storage::read` now propagates the error further instead of hiding it (#241)
- `Storage::read` now reads Entries first, select latest one and only then loads the data (#276)
- Check blob validity on index regeneration (#289)
- Checksum validation in `Entry::load_data` (#274)
- Add fsync to header writing in blob (#243)
- Add periodic fsync (#234)

#### Changed
- Reduced internal strucutre size in memory: bloom filter `Config` and `FileName` (#260)
- Serialize/Deserialize implementation removed from `Record` struct (#282)

Expand Down
95 changes: 21 additions & 74 deletions src/blob/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::SystemTime;

use bytes::{BufMut, Bytes, BytesMut};
use bytes::{BufMut, BytesMut};
use tokio::time::Instant;

use crate::error::ValidationErrorKind;
Expand All @@ -24,7 +24,7 @@ where
{
header: Header,
index: Index<K>,
name: FileName,
name: Arc<FileName>,
file: File,
created_at: SystemTime,
validate_data_during_index_regen: bool,
Expand Down Expand Up @@ -65,7 +65,7 @@ where
let mut blob = Self {
header,
index,
name,
name: Arc::new(name),
file,
created_at: SystemTime::now(),
validate_data_during_index_regen,
Expand Down Expand Up @@ -179,7 +179,7 @@ where
let mut blob = Self {
header,
file,
name,
name: Arc::new(name),
index,
created_at,
validate_data_during_index_regen,
Expand Down Expand Up @@ -270,48 +270,6 @@ where
Ok(WriteResult { dirty_bytes: self.file.dirty_bytes() })
}

pub(crate) async fn read_last(
&self,
key: &K,
meta: Option<&Meta>,
check_filters: bool,
) -> Result<ReadResult<Bytes>> {
debug!("blob read any");
let entry = self.get_entry(key, meta, check_filters).await?;
match entry {
ReadResult::Found(entry) => {
debug!("blob read any entry found");
let buf = entry
.load()
.await
.with_context(|| {
format!(
"failed to read key {:?} with meta {:?} from blob {:?}",
key,
meta,
self.name.as_path()
)
})?
.into_data();
debug!("blob read any entry loaded bytes: {}", buf.len());
Ok(ReadResult::Found(buf))
}
ReadResult::Deleted(ts) => Ok(ReadResult::Deleted(ts)),
ReadResult::NotFound => Ok(ReadResult::NotFound),
}
}

#[allow(dead_code)]
#[inline]
pub(crate) async fn read_all_entries(&self, key: &K) -> Result<Vec<Entry>> {
let headers = self.index.get_all(key).await?;
debug_assert!(headers
.iter()
.zip(headers.iter().skip(1))
.all(|(x, y)| x.created() >= y.created()));
Ok(Self::headers_to_entries(headers, &self.file))
}

#[inline]
pub(crate) async fn read_all_entries_with_deletion_marker(
&self,
Expand All @@ -321,40 +279,43 @@ where
debug_assert!(headers
.iter()
.zip(headers.iter().skip(1))
.all(|(x, y)| x.created() >= y.created()));
Ok(Self::headers_to_entries(headers, &self.file))
.all(|(x, y)| x.timestamp() >= y.timestamp()));
Ok(Self::headers_to_entries(headers, &self.file, &self.name))
}

pub(crate) async fn mark_all_as_deleted(
pub(crate) async fn delete(
&mut self,
key: &K,
timestamp: BlobRecordTimestamp,
meta: Option<Meta>,
only_if_presented: bool,
) -> Result<DeleteResult> {
if !only_if_presented || self.index.get_any(key).await?.is_found() {
self.push_deletion_record(key).await
if !only_if_presented || self.index.get_latest(key).await?.is_found() {
let record = Record::deleted(key, timestamp.into(), meta)?;
self.push_deletion_record(key, record).await
} else {
Ok(DeleteResult { dirty_bytes: self.file.dirty_bytes(), deleted: false })
}
}

async fn push_deletion_record(&mut self, key: &K) -> Result<DeleteResult> {
async fn push_deletion_record(&mut self, key: &K, record: Record) -> Result<DeleteResult> {
let on_disk = self.index.on_disk();
if on_disk {
self.load_index().await?;
}
let record = Record::deleted(key)?;
let result = self.write_mut(key, record).await?;
Ok(DeleteResult { dirty_bytes: result.dirty_bytes, deleted: true })
}

fn headers_to_entries(headers: Vec<RecordHeader>, file: &File) -> Vec<Entry> {
fn headers_to_entries(headers: Vec<RecordHeader>, file: &File, file_name: &Arc<FileName>) -> Vec<Entry> {
headers
.into_iter()
.map(|header| Entry::new(header, file.clone()))
.map(|header| Entry::new(header, file.clone(), file_name.clone()))
.collect()
}

async fn get_entry(
/// Returns latest Entry from Blob for specified key and meta
pub(crate) async fn get_latest_entry(
&self,
key: &K,
meta: Option<&Meta>,
Expand All @@ -371,13 +332,13 @@ where
debug!("blob get any entry bloom true no meta");
Ok(self
.index
.get_any(key)
.get_latest(key)
.await
.with_context(|| {
format!("index get any failed for blob: {:?}", self.name.as_path())
})?
.map(|header| {
let entry = Entry::new(header, self.file.clone());
let entry = Entry::new(header, self.file.clone(), self.name.clone());
debug!("blob, get any entry, bloom true no meta, entry found");
entry
}))
Expand All @@ -389,11 +350,11 @@ where
let deleted_ts = headers
.last()
.filter(|h| h.is_deleted())
.map(|h| BlobRecordTimestamp::new(h.created()));
.map(|h| BlobRecordTimestamp::new(h.timestamp()));
if deleted_ts.is_some() {
headers.truncate(headers.len() - 1);
}
let entries = Self::headers_to_entries(headers, &self.file);
let entries = Self::headers_to_entries(headers, &self.file, &self.name);
if let Some(entries) = self.filter_entries(entries, meta).await? {
Ok(ReadResult::Found(entries))
} else {
Expand All @@ -413,20 +374,6 @@ where
Ok(None)
}

pub(crate) async fn contains(
&self,
key: &K,
meta: Option<&Meta>,
) -> Result<ReadResult<BlobRecordTimestamp>> {
debug!("blob contains");
let contains = self
.get_entry(key, meta, true)
.await?
.map(|e| BlobRecordTimestamp::new(e.created()));
debug!("blob contains any: {:?}", contains);
Ok(contains)
}

#[inline]
pub(crate) fn file_size(&self) -> u64 {
self.file.size()
Expand Down
29 changes: 19 additions & 10 deletions src/blob/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Entry {
header: RecordHeader,
meta: Option<Meta>,
blob_file: File,
blob_file_name: Arc<FileName>
}

impl Entry {
Expand All @@ -31,12 +32,15 @@ impl Entry {
.read_exact_at_allocate(data_size + meta_size, self.header.meta_offset())
.await
.map_err(|err| err.into_bincode_if_unexpected_eof())
.context("Record load failed")?;
.with_context(|| format!("record load failed from BLOB: {}", self.blob_file_name.as_path().display()))?;
let mut buf = buf.freeze();
let data_buf = buf.split_off(meta_size);
let meta = Meta::from_raw(&buf).map_err(|err| Error::from(err))?;
let record = Record::new(self.header.clone(), meta, data_buf);
record.validate()
let meta = Meta::from_raw(&buf)
.map_err(|err| Error::from(err))
.with_context(|| format!("deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?;
Record::new(self.header, meta, data_buf)
.validate()
.with_context(|| format!("validation failed for Record loaded from BLOB: {}", self.blob_file_name.as_path().display()))
}

/// Returns only data.
Expand All @@ -48,7 +52,7 @@ impl Entry {
.read_exact_at_allocate(self.header.data_size().try_into()?, data_offset)
.await
.map_err(|err| err.into_bincode_if_unexpected_eof())
.context("Error loading Record data")?;
.with_context(|| format!("error loading Record data from BLOB: {}", self.blob_file_name.as_path().display()))?;
self.header.data_checksum_audit(&data)?;
Ok(data)
}
Expand All @@ -63,8 +67,12 @@ impl Entry {
.read_exact_at_allocate(self.header.meta_size().try_into()?, meta_offset)
.await
.map_err(|err| err.into_bincode_if_unexpected_eof())
.with_context(|| format!("failed to read Record metadata, offset: {}", meta_offset))?;
self.meta = Some(Meta::from_raw(&buf).map_err(|err| Error::from(err))?);
.with_context(|| format!("failed to read Record metadata from BLOB: {}", self.blob_file_name.as_path().display()))?;
let meta = Meta::from_raw(&buf)
.map_err(|err| Error::from(err))
.with_context(|| format!("deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?;

self.meta = Some(meta);
Ok(self.meta.as_ref())
}

Expand All @@ -74,15 +82,16 @@ impl Entry {
}

/// Timestamp when entry was created
pub fn created(&self) -> u64 {
self.header.created()
pub fn timestamp(&self) -> BlobRecordTimestamp {
BlobRecordTimestamp::new(self.header.timestamp())
}

pub(crate) fn new(header: RecordHeader, blob_file: File) -> Self {
pub(crate) fn new(header: RecordHeader, blob_file: File, blob_file_name: Arc<FileName>) -> Self {
Self {
meta: None,
header,
blob_file,
blob_file_name
}
}
}
4 changes: 2 additions & 2 deletions src/blob/index/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn generate_headers(records_amount: usize, key_mapper: fn(u32) -> u32) -> InMemo
.map(|i| serialize(&i).expect("can't serialize"))
.for_each(|key| {
let key: KeyType = key.into();
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
if let Some(v) = inmem.get_mut(&key) {
v.push(rh);
} else {
Expand Down Expand Up @@ -208,7 +208,7 @@ async fn benchmark_get_any() {
if (i as u32 + 1) % PRINT_EVERY == 0 {
println!("Iteration: {}...", i + 1);
}
let _ = findex.get_any(&q.into()).await.unwrap();
let _ = findex.get_latest(&q.into()).await.unwrap();
}
println!(
"get_any avg time: {}\n",
Expand Down
2 changes: 1 addition & 1 deletion src/blob/index/bptree/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
.map(|headers| (headers, self.header.records_count))
}

async fn get_any(&self, key: &K) -> Result<Option<RecordHeader>> {
async fn get_latest(&self, key: &K) -> Result<Option<RecordHeader>> {
let root_offset = self.metadata.tree_offset;
let buf = BytesMut::zeroed(BLOCK_SIZE);
let (buf, leaf_offset) = self.find_leaf_node(key, root_offset, buf).await?;
Expand Down
20 changes: 10 additions & 10 deletions src/blob/index/bptree/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn serialize_deserialize_file() {
let test_dir = get_test_dir("serialize_deserialize_file");
let mut inmem = InMemoryIndex::<KeyType>::new();
(0..10000).map(|i| i.into()).for_each(|key: KeyType| {
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
inmem.insert(key, vec![rh]);
});
let meta = vec![META_VALUE; META_SIZE];
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn blob_size_invalidation() {
let filename = test_dir.join("bptree_index.0.index");
let mut inmem = InMemoryIndex::<KeyType>::new();
(0..10000).map(|i| i.into()).for_each(|key: KeyType| {
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
inmem.insert(key, vec![rh]);
});
let meta = vec![META_VALUE; META_SIZE];
Expand Down Expand Up @@ -161,7 +161,7 @@ async fn magic_byte_corruption() {
let filename = test_dir.join("bptree_index.0.index");
let mut inmem = InMemoryIndex::<KeyType>::new();
(0..10000).map(|i| i.into()).for_each(|key: KeyType| {
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
inmem.insert(key, vec![rh]);
});
let meta = vec![META_VALUE; META_SIZE];
Expand Down Expand Up @@ -219,7 +219,7 @@ async fn check_get_any() {
(RANGE_FROM..RANGE_TO)
.map(|i| i.into())
.for_each(|key: KeyType| {
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
inmem.insert(key, vec![rh]);
});
let meta = vec![META_VALUE; META_SIZE];
Expand All @@ -236,7 +236,7 @@ async fn check_get_any() {
.expect("Can't create file index");
let presented_keys = RANGE_FROM..RANGE_TO;
for key in presented_keys.map(|k| k.into()) {
if let Ok(inner_res) = findex.get_any(&key).await {
if let Ok(inner_res) = findex.get_latest(&key).await {
if let Some(actual_header) = inner_res {
let key_deserialized: usize = key.clone().into();
assert_eq!(
Expand All @@ -254,7 +254,7 @@ async fn check_get_any() {
let not_presented_ranges = [0..RANGE_FROM, RANGE_TO..(RANGE_TO + 100)];
for not_presented_keys in not_presented_ranges.iter() {
for key in not_presented_keys.clone().map(|k| serialize(&k).unwrap()) {
assert_eq!(None, findex.get_any(&key.into()).await.unwrap());
assert_eq!(None, findex.get_latest(&key.into()).await.unwrap());
}
}

Expand All @@ -271,8 +271,8 @@ async fn preserves_records_order() {
(RANGE_FROM..RANGE_TO)
.map(|i| i.into())
.for_each(|key: KeyType| {
let rh1 = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh2 = RecordHeader::new(key.to_vec(), 2, 2, 2);
let rh1 = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
let rh2 = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 2, 2, 2);
inmem.insert(key, vec![rh1, rh2]);
});
let meta = vec![META_VALUE; META_SIZE];
Expand Down Expand Up @@ -314,7 +314,7 @@ async fn check_get() {
(RANGE_FROM..RANGE_TO)
.map(|i| (i % MAX_AMOUNT + 1, i.into()))
.for_each(|(times, key): (_, KeyType)| {
let rh = RecordHeader::new(key.to_vec(), 1, 1, 1);
let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1);
let recs = (0..times).map(|_| rh.clone()).collect();
inmem.insert(key, recs);
});
Expand All @@ -332,7 +332,7 @@ async fn check_get() {
.expect("Can't create file index");
let presented_keys = RANGE_FROM..RANGE_TO;
for key in presented_keys.map(|k| k.into()) {
if let Ok(inner_res) = findex.get_any(&key).await {
if let Ok(inner_res) = findex.get_latest(&key).await {
if let Some(actual_header) = inner_res {
let key_deserialized: usize = key.clone().into();
assert_eq!(
Expand Down
Loading