From dfc1ee4585836ce50413da14e32f0190f6fdc68d Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 21:31:09 +0100 Subject: [PATCH 01/24] [231] Pass and store timestamp within record --- src/blob/core.rs | 15 ++++++++------- src/blob/entry.rs | 4 ++-- src/blob/index/core.rs | 12 +++++++++--- src/record.rs | 27 +++++++++++---------------- src/storage/core.rs | 38 +++++++++++++++++++++----------------- src/storage/read_result.rs | 12 +++++++++++- 6 files changed, 62 insertions(+), 46 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index 48dcd32b0f..40f3baee13 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -328,7 +328,7 @@ where debug_assert!(headers .iter() .zip(headers.iter().skip(1)) - .all(|(x, y)| x.created() >= y.created())); + .all(|(x, y)| x.timestamp() >= y.timestamp())); Ok(Self::headers_to_entries(headers, &self.file)) } @@ -341,29 +341,30 @@ where debug_assert!(headers .iter() .zip(headers.iter().skip(1)) - .all(|(x, y)| x.created() >= y.created())); + .all(|(x, y)| x.timestamp() >= y.timestamp())); Ok(Self::headers_to_entries(headers, &self.file)) } pub(crate) async fn mark_all_as_deleted( &mut self, key: &K, + timestamp: BlobRecordTimestamp, only_if_presented: bool, ) -> Result { if !only_if_presented || self.index.get_any(key).await?.is_found() { - self.push_deletion_record(key).await?; + self.push_deletion_record(key, timestamp).await?; Ok(true) } else { Ok(false) } } - async fn push_deletion_record(&mut self, key: &K) -> Result<()> { + async fn push_deletion_record(&mut self, key: &K, timestamp: BlobRecordTimestamp) -> Result<()> { let on_disk = self.index.on_disk(); if on_disk { self.load_index().await?; } - let record = Record::deleted(key)?; + let record = Record::deleted(key, timestamp.into())?; let header = self.write_mut(key, record).await?; self.index.push_deletion(key, header) } @@ -410,7 +411,7 @@ where let deleted_ts = headers .last() .filter(|h| h.is_deleted()) - .map(|h| BlobRecordTimestamp::new(h.created())); + .map(|h| BlobRecordTimestamp::new(h.timestamp())); if deleted_ts.is_some() { headers.truncate(headers.len() - 1); } @@ -443,7 +444,7 @@ where let contains = self .get_entry(key, meta, true) .await? - .map(|e| BlobRecordTimestamp::new(e.created())); + .map(|e| e.timestamp()); debug!("blob contains any: {:?}", contains); Ok(contains) } diff --git a/src/blob/entry.rs b/src/blob/entry.rs index 10123b3c4e..cfb40f1ae4 100644 --- a/src/blob/entry.rs +++ b/src/blob/entry.rs @@ -65,8 +65,8 @@ impl Entry { } /// Timestamp when entry was created - pub fn created(&self) -> u64 { - self.header.created() + pub fn timestamp(&self) -> BlobRecordTimestamp { + BlobRecordTimestamp::new(self.header.timestamp()) } pub(crate) fn new(header: RecordHeader, blob_file: File) -> Self { diff --git a/src/blob/index/core.rs b/src/blob/index/core.rs index 4a3cfde7dd..b2241bd714 100644 --- a/src/blob/index/core.rs +++ b/src/blob/index/core.rs @@ -254,7 +254,7 @@ where async fn contains_key(&self, key: &K) -> Result> { self.get_any(key) .await - .map(|h| h.map(|h| BlobRecordTimestamp::new(h.created()))) + .map(|h| h.map(|h| BlobRecordTimestamp::new(h.timestamp()))) } fn push(&mut self, key: &K, h: RecordHeader) -> Result<()> { @@ -270,7 +270,13 @@ where .expect("No memory info in `InMemory` State"); if let Some(v) = headers.get_mut(key) { let old_capacity = v.capacity(); - v.push(h); + // Keep ordered by timestamp + let mut pos = v.binary_search_by(|item| item.timestamp().cmp(&h.timestamp())).unwrap_or_else(|e| e); + // Skip records with equal timestamp (our should be the latest) + while pos < v.len() && v[pos].timestamp() == h.timestamp() { + pos += 1; + } + v.insert(pos, h); trace!("capacity growth: {}", v.capacity() - old_capacity); mem.records_allocated += v.capacity() - old_capacity; } else { @@ -338,7 +344,7 @@ where }; Ok(match result { Some(header) if header.is_deleted() => { - ReadResult::Deleted(BlobRecordTimestamp::new(header.created())) + ReadResult::Deleted(BlobRecordTimestamp::new(header.timestamp())) } Some(header) => ReadResult::Found(header), None => ReadResult::NotFound, diff --git a/src/record.rs b/src/record.rs index 90a02f1567..b9954a6757 100644 --- a/src/record.rs +++ b/src/record.rs @@ -65,7 +65,7 @@ pub struct Header { data_size: u64, flags: u8, blob_offset: u64, - created: u64, + timestamp: u64, // This was previously a 'created' field. Adding new field is a breaking change. TODO: add back 'created' in the future releases data_checksum: u32, header_checksum: u32, } @@ -125,14 +125,14 @@ impl Record { } /// Creates new `Record` with provided data, key and meta. - pub fn create(key: &K, data: Bytes, meta: Meta) -> bincode::Result + pub fn create(key: &K, timestamp: u64, data: Bytes, meta: Meta) -> bincode::Result where for<'a> K: Key<'a>, { let key = key.as_ref().to_vec(); let meta_size = meta.serialized_size()?; let data_checksum = CRC32C.checksum(&data); - let header = Header::new(key, meta_size, data.len() as u64, data_checksum); + let header = Header::new(key, timestamp, meta_size, data.len() as u64, data_checksum); Ok(Self { header, meta, data }) } @@ -156,11 +156,11 @@ impl Record { Ok(buf) } - pub(crate) fn deleted(key: &K) -> bincode::Result + pub(crate) fn deleted(key: &K, timestamp: u64) -> bincode::Result where for<'a> K: Key<'a> + 'static, { - let mut record = Record::create(key, Bytes::new(), Meta::default())?; + let mut record = Record::create(key, timestamp, Bytes::new(), Meta::default())?; record.header.mark_as_deleted()?; Ok(record) } @@ -198,14 +198,7 @@ impl Record { } impl Header { - pub fn new(key: Vec, meta_size: u64, data_size: u64, data_checksum: u32) -> Self { - let created = std::time::UNIX_EPOCH.elapsed().map_or_else( - |e| { - error!("{}", e); - 0 - }, - |d| d.as_secs(), - ); + pub fn new(key: Vec, timestamp: u64, meta_size: u64, data_size: u64, data_checksum: u32) -> Self { Self { magic_byte: RECORD_MAGIC_BYTE, key, @@ -213,7 +206,7 @@ impl Header { data_size, flags: 0, blob_offset: 0, - created, + timestamp: timestamp, data_checksum, header_checksum: 0, } @@ -328,11 +321,13 @@ impl Header { self.update_checksum() } + #[inline] pub(crate) fn is_deleted(&self) -> bool { self.flags & DELETE_FLAG == DELETE_FLAG } - pub(crate) fn created(&self) -> u64 { - self.created + #[inline] + pub(crate) fn timestamp(&self) -> u64 { + self.timestamp } } diff --git a/src/storage/core.rs b/src/storage/core.rs index 2c7406f695..d3f83d31c5 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -243,15 +243,16 @@ where /// async fn write_data(storage: Storage>) { /// let key = ArrayKey::<8>::default(); /// let data = b"async written to blob".to_vec().into(); - /// storage.write(key, data).await; + /// let timestamp = BlobRecordTimestamp::now(); + /// storage.write(key, data, timestamp).await; /// } /// ``` /// # Errors /// Fails with the same errors as [`write_with`] /// /// [`write_with`]: Storage::write_with - pub async fn write(&self, key: impl AsRef, value: Bytes) -> Result<()> { - self.write_with_optional_meta(key, value, None).await + pub async fn write(&self, key: impl AsRef, value: Bytes, timestamp: BlobRecordTimestamp) -> Result<()> { + self.write_with_optional_meta(key, value, timestamp, None).await } /// Similar to [`write`] but with metadata @@ -262,15 +263,16 @@ where /// async fn write_data(storage: Storage>) { /// let key = ArrayKey::<8>::default(); /// let data = b"async written to blob".to_vec().into(); + /// let timestamp = BlobRecordTimestamp::now(); /// let mut meta = Meta::new(); /// meta.insert("version".to_string(), b"1.0".to_vec()); - /// storage.write_with(&key, data, meta).await; + /// storage.write_with(&key, data, timestamp, meta).await; /// } /// ``` /// # Errors /// Fails if duplicates are not allowed and record already exists. - pub async fn write_with(&self, key: impl AsRef, value: Bytes, meta: Meta) -> Result<()> { - self.write_with_optional_meta(key, value, Some(meta)).await + pub async fn write_with(&self, key: impl AsRef, value: Bytes, timestamp: BlobRecordTimestamp, meta: Meta) -> Result<()> { + self.write_with_optional_meta(key, value, timestamp, Some(meta)).await } /// Free all resources that may be freed without work interruption @@ -298,6 +300,7 @@ where &self, key: impl AsRef, value: Bytes, + timestamp: BlobRecordTimestamp, meta: Option, ) -> Result<()> { let key = key.as_ref(); @@ -316,7 +319,7 @@ where ); return Ok(()); } - let record = Record::create(key, value, meta.unwrap_or_default()) + let record = Record::create(key, timestamp.into(), value, meta.unwrap_or_default()) .with_context(|| "storage write with record creation failed")?; let safe = self.inner.safe.read().await; let blob = safe @@ -471,10 +474,10 @@ where "storage core read from non-active total {} entries", all_entries.len() ); - debug_assert!(all_entries - .iter() - .zip(all_entries.iter().skip(1)) - .all(|(x, y)| x.created() >= y.created())); + //debug_assert!(all_entries + // .iter() + // .zip(all_entries.iter().skip(1)) + // .all(|(x, y)| x.created() >= y.created())); Ok(all_entries) } @@ -971,24 +974,24 @@ where /// Delete entries with matching key /// # Errors /// Fails after any disk IO errors. - pub async fn delete(&self, key: impl AsRef, only_if_presented: bool) -> Result { + pub async fn delete(&self, key: impl AsRef, timestamp: BlobRecordTimestamp, only_if_presented: bool) -> Result { let mut total = 0; let mut safe = self.inner.safe.write().await; total += self - .mark_all_as_deleted_active(&mut *safe, key.as_ref(), only_if_presented) + .mark_all_as_deleted_active(&mut *safe, key.as_ref(), timestamp, only_if_presented) .await?; total += self - .mark_all_as_deleted_closed(&mut *safe, key.as_ref()) + .mark_all_as_deleted_closed(&mut *safe, key.as_ref(), timestamp) .await?; debug!("{} deleted total", total); Ok(total) } - async fn mark_all_as_deleted_closed(&self, safe: &mut Safe, key: &K) -> Result { + async fn mark_all_as_deleted_closed(&self, safe: &mut Safe, key: &K, timestamp: BlobRecordTimestamp) -> Result { let mut blobs = safe.blobs.write().await; let entries_closed_blobs = blobs .iter_mut() - .map(|b| b.mark_all_as_deleted(key, false)) + .map(|b| b.mark_all_as_deleted(key, timestamp, false)) .collect::>(); let total = entries_closed_blobs .map(|result| match result { @@ -1015,6 +1018,7 @@ where &self, safe: &mut Safe, key: &K, + timestamp: BlobRecordTimestamp, only_if_presented: bool, ) -> Result { if !only_if_presented { @@ -1025,7 +1029,7 @@ where let is_deleted = active_blob .write() .await - .mark_all_as_deleted(key, only_if_presented) + .mark_all_as_deleted(key, timestamp, only_if_presented) .await?; let count = if is_deleted { 1 } else { 0 }; debug!("{} deleted from active blob", count); diff --git a/src/storage/read_result.rs b/src/storage/read_result.rs index f9eed7d4ba..03ffc440e5 100644 --- a/src/storage/read_result.rs +++ b/src/storage/read_result.rs @@ -16,9 +16,19 @@ pub enum ReadResult { } impl BlobRecordTimestamp { - pub(crate) fn new(t: u64) -> Self { + /// Creates timestamp from user supplied number + pub fn new(t: u64) -> Self { BlobRecordTimestamp(t) } + + /// Current UNIX timestamp + pub fn now() -> Self { + BlobRecordTimestamp( + match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) { + Ok(n) => n.as_secs(), + Err(_) => panic!("SystemTime before UNIX EPOCH!"), + }) + } } impl Display for BlobRecordTimestamp { From 1824f70723f687db9eea9a0ef13d97de1572ae32 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 21:37:25 +0100 Subject: [PATCH 02/24] [231] Update tests --- tests/tests.rs | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/tests/tests.rs b/tests/tests.rs index f1754984f2..52a4c3f03b 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -8,7 +8,7 @@ use futures::{ stream::{futures_unordered::FuturesUnordered, FuturesOrdered, StreamExt, TryStreamExt}, TryFutureExt, }; -use pearl::{BloomProvider, Builder, Meta, ReadResult, Storage}; +use pearl::{BloomProvider, Builder, Meta, ReadResult, Storage, BlobRecordTimestamp}; use rand::{seq::SliceRandom, Rng}; use std::{ fs, @@ -325,7 +325,7 @@ async fn test_index_from_empty_blob() { assert!(blob_file_path.exists()); let new_storage = common::create_test_storage(&path, 1_000_000).await.unwrap(); new_storage - .write(KeyTest::new(1), vec![1; 8].into()) + .write(KeyTest::new(1), vec![1; 8].into(), BlobRecordTimestamp::now()) .await .unwrap(); new_storage.close().await.unwrap(); @@ -390,7 +390,7 @@ async fn test_write_512_records_with_same_key() { meta.insert("version".to_owned(), i.to_string()); sleep(Duration::from_micros(1)).await; storage - .write_with(&key, value.clone().into(), meta) + .write_with(&key, value.clone().into(), BlobRecordTimestamp::now(), meta) .await .unwrap(); } @@ -539,10 +539,10 @@ async fn test_check_bloom_filter_single() { let neg_key = KeyTest::new(i + 2 * repeat); trace!("key: {}, pos: {:?}, negative: {:?}", i, pos_key, neg_key); let key = KeyTest::new(i); - storage.write(&key, data.to_vec().into()).await.unwrap(); + storage.write(&key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); assert_eq!(storage.check_filters(key).await, Some(true)); let data = b"other_random_data"; - storage.write(&pos_key, data.to_vec().into()).await.unwrap(); + storage.write(&pos_key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); assert_eq!(storage.check_filters(pos_key).await, Some(true)); assert_eq!(storage.check_filters(neg_key).await, Some(false)); } @@ -558,7 +558,7 @@ async fn test_check_bloom_filter_multiple() { b"lfolakfsjher_rladncreladlladkfsje_pkdieldpgkeolladkfsjeslladkfsj_slladkfsjorladgedom_dladlladkfsjlad"; for i in 1..800 { let key = KeyTest::new(i); - storage.write(&key, data.to_vec().into()).await.unwrap(); + storage.write(&key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); sleep(Duration::from_millis(6)).await; trace!("blobs count: {}", storage.blobs_count().await); } @@ -581,7 +581,7 @@ async fn test_check_bloom_filter_multiple_offloaded() { b"lfolakfsjher_rladncreladlladkfsje_pkdieldpgkeolladkfsjeslladkfsj_slladkfsjorladgedom_dladlladkfsjlad"; for i in 1..800 { let key = KeyTest::new(i); - storage.write(&key, data.to_vec().into()).await.unwrap(); + storage.write(&key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); sleep(Duration::from_millis(6)).await; trace!("blobs count: {}", storage.blobs_count().await); } @@ -610,7 +610,7 @@ async fn test_check_bloom_filter_init_from_existing() { for i in 1..base { let key = KeyTest::new(i); trace!("write key: {}", i); - storage.write(&key, data.to_vec().into()).await.unwrap(); + storage.write(&key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); trace!("blobs count: {}", storage.blobs_count().await); } debug!("close storage"); @@ -656,7 +656,7 @@ async fn test_check_bloom_filter_generated() { for i in 1..base { let key = KeyTest::new(i); trace!("write key: {}", i); - storage.write(&key, data.to_vec().into()).await.unwrap(); + storage.write(&key, data.to_vec().into(), BlobRecordTimestamp::now()).await.unwrap(); trace!("blobs count: {}", storage.blobs_count().await); } debug!("close storage"); @@ -701,10 +701,10 @@ async fn write_one( debug!("tests write one key: {:?}", key); if let Some(v) = version { debug!("tests write one write with"); - storage.write_with(key, data, meta_with(v)).await + storage.write_with(key, data, BlobRecordTimestamp::now(), meta_with(v)).await } else { debug!("tests write one write"); - storage.write(key, data).await + storage.write(key, data, BlobRecordTimestamp::now()).await } } @@ -910,7 +910,7 @@ async fn test_mark_as_deleted_single() { write_one(&storage, *key, data, None).await.unwrap(); sleep(Duration::from_millis(64)).await; } - storage.delete(&delete_key, false).await.unwrap(); + storage.delete(&delete_key, BlobRecordTimestamp::now(), false).await.unwrap(); assert!(matches!( storage.contains(delete_key).await.unwrap(), ReadResult::Deleted(_) @@ -934,7 +934,7 @@ async fn test_mark_as_deleted_deferred_dump() { let storage = common::create_test_storage(&path, 10_000).await.unwrap(); let update_time = std::fs::metadata(&path.join("test.0.index")).expect("metadata"); - storage.delete(&delete_key, false).await.unwrap(); + storage.delete(&delete_key, BlobRecordTimestamp::now(), false).await.unwrap(); sleep(MIN_DEFER_TIME / 2).await; let new_update_time = std::fs::metadata(&path.join("test.0.index")).expect("metadata"); @@ -1077,9 +1077,9 @@ async fn test_read_all_with_deletion_marker_delete_middle() -> Result<()> { let storage = common::default_test_storage_in(path).await.unwrap(); let key: KeyTest = vec![0].into(); let data: Bytes = "test data string".repeat(16).as_bytes().to_vec().into(); - storage.write(&key, data.clone()).await?; - storage.delete(&key, true).await?; - storage.write(&key, data.clone()).await?; + storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; + storage.delete(&key, BlobRecordTimestamp::now(), true).await?; + storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; let read = storage.read_all_with_deletion_marker(&key).await?; @@ -1096,11 +1096,11 @@ async fn test_read_all_with_deletion_marker_delete_middle_different_blobs() -> R let storage = common::default_test_storage_in(path).await.unwrap(); let key: KeyTest = vec![0].into(); let data: Bytes = "test data string".repeat(16).as_bytes().to_vec().into(); - storage.write(&key, data.clone()).await?; + storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; storage.try_close_active_blob().await?; - storage.delete(&key, false).await?; + storage.delete(&key, BlobRecordTimestamp::now(), false).await?; storage.try_close_active_blob().await?; - storage.write(&key, data.clone()).await?; + storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; storage.try_close_active_blob().await?; let read = storage.read_all_with_deletion_marker(&key).await?; From 57d2569db3b61bc56fa122f35cdfb375f3be7c44 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 21:47:29 +0100 Subject: [PATCH 03/24] [231] Update tests --- src/blob/index/benchmarks.rs | 2 +- src/blob/index/bptree/tests.rs | 14 +++++++------- src/storage/read_result.rs | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/blob/index/benchmarks.rs b/src/blob/index/benchmarks.rs index 333c5f2e0d..474a048b8a 100644 --- a/src/blob/index/benchmarks.rs +++ b/src/blob/index/benchmarks.rs @@ -59,7 +59,7 @@ fn generate_headers(records_amount: usize, key_mapper: fn(u32) -> u32) -> InMemo .map(|i| serialize(&i).expect("can't serialize")) .for_each(|key| { let key: KeyType = key.into(); - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); if let Some(v) = inmem.get_mut(&key) { v.push(rh); } else { diff --git a/src/blob/index/bptree/tests.rs b/src/blob/index/bptree/tests.rs index e32dbd305e..45ce5d250f 100644 --- a/src/blob/index/bptree/tests.rs +++ b/src/blob/index/bptree/tests.rs @@ -69,7 +69,7 @@ impl Into for KeyType { async fn serialize_deserialize_file() { let mut inmem = InMemoryIndex::::new(); (0..10000).map(|i| i.into()).for_each(|key: KeyType| { - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); inmem.insert(key, vec![rh]); }); let meta = vec![META_VALUE; META_SIZE]; @@ -95,7 +95,7 @@ async fn blob_size_invalidation() { let filename = "/tmp/bptree_index.0.index"; let mut inmem = InMemoryIndex::::new(); (0..10000).map(|i| i.into()).for_each(|key: KeyType| { - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); inmem.insert(key, vec![rh]); }); let meta = vec![META_VALUE; META_SIZE]; @@ -129,7 +129,7 @@ async fn magic_byte_corruption() { let filename = "/tmp/bptree_index.0.index"; let mut inmem = InMemoryIndex::::new(); (0..10000).map(|i| i.into()).for_each(|key: KeyType| { - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); inmem.insert(key, vec![rh]); }); let meta = vec![META_VALUE; META_SIZE]; @@ -183,7 +183,7 @@ async fn check_get_any() { (RANGE_FROM..RANGE_TO) .map(|i| i.into()) .for_each(|key: KeyType| { - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); inmem.insert(key, vec![rh]); }); let meta = vec![META_VALUE; META_SIZE]; @@ -231,8 +231,8 @@ async fn preserves_records_order() { (RANGE_FROM..RANGE_TO) .map(|i| i.into()) .for_each(|key: KeyType| { - let rh1 = RecordHeader::new(key.to_vec(), 1, 1, 1); - let rh2 = RecordHeader::new(key.to_vec(), 2, 2, 2); + let rh1 = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); + let rh2 = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 2, 2, 2); inmem.insert(key, vec![rh1, rh2]); }); let meta = vec![META_VALUE; META_SIZE]; @@ -263,7 +263,7 @@ async fn check_get() { (RANGE_FROM..RANGE_TO) .map(|i| (i % MAX_AMOUNT + 1, i.into())) .for_each(|(times, key): (_, KeyType)| { - let rh = RecordHeader::new(key.to_vec(), 1, 1, 1); + let rh = RecordHeader::new(key.to_vec(), BlobRecordTimestamp::now().into(), 1, 1, 1); let recs = (0..times).map(|_| rh.clone()).collect(); inmem.insert(key, recs); }); diff --git a/src/storage/read_result.rs b/src/storage/read_result.rs index 03ffc440e5..076f2badf3 100644 --- a/src/storage/read_result.rs +++ b/src/storage/read_result.rs @@ -26,7 +26,7 @@ impl BlobRecordTimestamp { BlobRecordTimestamp( match std::time::SystemTime::now().duration_since(std::time::SystemTime::UNIX_EPOCH) { Ok(n) => n.as_secs(), - Err(_) => panic!("SystemTime before UNIX EPOCH!"), + Err(_) => 0, }) } } From 626d5f27465cf4bcb6a72a3527773696e3376026 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 21:57:22 +0100 Subject: [PATCH 04/24] [231] Fix docs --- src/lib.rs | 5 +++-- src/storage/core.rs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2abdd6942a..aea869f5d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! For more advanced usage see the benchmark tool as the example //! //! ```no_run -//! use pearl::{Storage, Builder, ArrayKey}; +//! use pearl::{Storage, Builder, ArrayKey, BlobRecordTimestamp}; //! //! #[tokio::main] //! async fn main() { @@ -33,7 +33,8 @@ //! storage.init().await.unwrap(); //! let key = ArrayKey::<8>::default(); //! let data = b"Hello World!".to_vec(); -//! storage.write(key, data.into()).await.unwrap(); +//! let timestamp = BlobRecordTimestamp::now(); +//! storage.write(key, data.into(), timestamp).await.unwrap(); //! } //! ``` diff --git a/src/storage/core.rs b/src/storage/core.rs index d3f83d31c5..05585e695c 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -238,7 +238,7 @@ where /// creation /// # Examples /// ```no_run - /// use pearl::{Builder, Storage, ArrayKey}; + /// use pearl::{Builder, Storage, ArrayKey, BlobRecordTimestamp}; /// /// async fn write_data(storage: Storage>) { /// let key = ArrayKey::<8>::default(); @@ -258,7 +258,7 @@ where /// Similar to [`write`] but with metadata /// # Examples /// ```no_run - /// use pearl::{Builder, Meta, Storage, ArrayKey}; + /// use pearl::{Builder, Meta, Storage, ArrayKey, BlobRecordTimestamp}; /// /// async fn write_data(storage: Storage>) { /// let key = ArrayKey::<8>::default(); @@ -474,6 +474,7 @@ where "storage core read from non-active total {} entries", all_entries.len() ); + // TODO: ordering! //debug_assert!(all_entries // .iter() // .zip(all_entries.iter().skip(1)) From 84c58bd3f3d674d8902501b3a2f90f2bb7575663 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 22:18:04 +0100 Subject: [PATCH 05/24] [231] Keeps ordering by timestamp in storage::read_all --- src/storage/core.rs | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/storage/core.rs b/src/storage/core.rs index 05585e695c..2dbd6ae267 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -435,7 +435,11 @@ where /// Fails after any disk IO errors. pub async fn read_all_with_deletion_marker(&self, key: impl AsRef) -> Result> { let key = key.as_ref(); + let mut all_entries = Vec::new(); + let mut deletion_marker_presence = false; + let mut affected_blobs_count = 0; + let safe = self.inner.safe.read().await; let active_blob = safe.active_blob.as_ref(); if let Some(active_blob) = active_blob { @@ -448,13 +452,12 @@ where "storage core read all active blob entries {}", entries.len() ); - if let Some(e) = entries.last() { - if e.is_deleted() { - return Ok(entries); - } - } + + deletion_marker_presence = deletion_marker_presence || entries.last().map(|e| e.is_deleted()).unwrap_or(false); + affected_blobs_count += if entries.len() > 0 { 1 } else { 0 }; all_entries.extend(entries); } + let blobs = safe.blobs.read().await; let mut futures = blobs .iter_possible_childs_rev(key) @@ -462,23 +465,29 @@ where .collect::>(); while let Some(data) = futures.next().await { let entries = data?; - if let Some(e) = entries.last() { - if e.is_deleted() { - all_entries.extend(entries); - return Ok(all_entries); - } - } + + deletion_marker_presence = deletion_marker_presence || entries.last().map(|e| e.is_deleted()).unwrap_or(false); + affected_blobs_count += if entries.len() > 0 { 1 } else { 0 }; all_entries.extend(entries); } debug!( "storage core read from non-active total {} entries", all_entries.len() ); - // TODO: ordering! - //debug_assert!(all_entries - // .iter() - // .zip(all_entries.iter().skip(1)) - // .all(|(x, y)| x.created() >= y.created())); + + // Try to preserve ordering by timestamp + if affected_blobs_count > 1 { + // If more than 1 blobs affect the result, then the order can be broken: restore it + all_entries.sort_by(|a, b| b.timestamp().cmp(&a.timestamp())); + if deletion_marker_presence { + // If deletion marker presented, we should find it and remove + let first_del = all_entries.iter().position(|h| h.is_deleted()); + if let Some(first_del) = first_del { + all_entries.truncate(first_del + 1); + } + } + } + Ok(all_entries) } From b809b9739db65d9dc70cbb15f9660650274fb30b Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 22:26:12 +0100 Subject: [PATCH 06/24] [231] Check data equality in tests --- tests/tests.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/tests.rs b/tests/tests.rs index 52a4c3f03b..bb09da9e7e 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1076,15 +1076,17 @@ async fn test_read_all_with_deletion_marker_delete_middle() -> Result<()> { let path = common::init("delete_middle"); let storage = common::default_test_storage_in(path).await.unwrap(); let key: KeyTest = vec![0].into(); - let data: Bytes = "test data string".repeat(16).as_bytes().to_vec().into(); - storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; + let data1: Bytes = "1. test data string".repeat(16).as_bytes().to_vec().into(); + let data2: Bytes = "2. test data string".repeat(16).as_bytes().to_vec().into(); + storage.write(&key, data1.clone(), BlobRecordTimestamp::now()).await?; storage.delete(&key, BlobRecordTimestamp::now(), true).await?; - storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; + storage.write(&key, data2.clone(), BlobRecordTimestamp::now()).await?; let read = storage.read_all_with_deletion_marker(&key).await?; assert_eq!(2, read.len()); assert!(!read[0].is_deleted()); + assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); assert!(read[1].is_deleted()); Ok(()) @@ -1095,18 +1097,20 @@ async fn test_read_all_with_deletion_marker_delete_middle_different_blobs() -> R let path = common::init("delete_middle_blobs"); let storage = common::default_test_storage_in(path).await.unwrap(); let key: KeyTest = vec![0].into(); - let data: Bytes = "test data string".repeat(16).as_bytes().to_vec().into(); - storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; + let data1: Bytes = "1. test data string".repeat(16).as_bytes().to_vec().into(); + let data2: Bytes = "2. test data string".repeat(16).as_bytes().to_vec().into(); + storage.write(&key, data1.clone(), BlobRecordTimestamp::now()).await?; storage.try_close_active_blob().await?; storage.delete(&key, BlobRecordTimestamp::now(), false).await?; storage.try_close_active_blob().await?; - storage.write(&key, data.clone(), BlobRecordTimestamp::now()).await?; + storage.write(&key, data2.clone(), BlobRecordTimestamp::now()).await?; storage.try_close_active_blob().await?; let read = storage.read_all_with_deletion_marker(&key).await?; assert_eq!(2, read.len()); assert!(!read[0].is_deleted()); + assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); assert!(read[1].is_deleted()); Ok(()) From d4aeec72cd9421b245ce6c0512c201f5d57c118b Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 22:49:20 +0100 Subject: [PATCH 07/24] [231] Add ability to attach metadata to deletion record --- src/blob/core.rs | 9 +++++---- src/record.rs | 7 ++++--- src/storage/core.rs | 25 +++++++++++++++++++------ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index 40f3baee13..80d4b21e49 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -345,26 +345,27 @@ where Ok(Self::headers_to_entries(headers, &self.file)) } - pub(crate) async fn mark_all_as_deleted( + pub(crate) async fn delete( &mut self, key: &K, timestamp: BlobRecordTimestamp, + meta: Option, only_if_presented: bool, ) -> Result { if !only_if_presented || self.index.get_any(key).await?.is_found() { - self.push_deletion_record(key, timestamp).await?; + let record = Record::deleted(key, timestamp.into(), meta)?; + self.push_deletion_record(key, record).await?; Ok(true) } else { Ok(false) } } - async fn push_deletion_record(&mut self, key: &K, timestamp: BlobRecordTimestamp) -> Result<()> { + async fn push_deletion_record(&mut self, key: &K, record: Record) -> Result<()> { let on_disk = self.index.on_disk(); if on_disk { self.load_index().await?; } - let record = Record::deleted(key, timestamp.into())?; let header = self.write_mut(key, record).await?; self.index.push_deletion(key, header) } diff --git a/src/record.rs b/src/record.rs index b9954a6757..a773e419c7 100644 --- a/src/record.rs +++ b/src/record.rs @@ -125,11 +125,12 @@ impl Record { } /// Creates new `Record` with provided data, key and meta. - pub fn create(key: &K, timestamp: u64, data: Bytes, meta: Meta) -> bincode::Result + pub fn create(key: &K, timestamp: u64, data: Bytes, meta: Option) -> bincode::Result where for<'a> K: Key<'a>, { let key = key.as_ref().to_vec(); + let meta = meta.unwrap_or_default(); let meta_size = meta.serialized_size()?; let data_checksum = CRC32C.checksum(&data); let header = Header::new(key, timestamp, meta_size, data.len() as u64, data_checksum); @@ -156,11 +157,11 @@ impl Record { Ok(buf) } - pub(crate) fn deleted(key: &K, timestamp: u64) -> bincode::Result + pub(crate) fn deleted(key: &K, timestamp: u64, meta: Option) -> bincode::Result where for<'a> K: Key<'a> + 'static, { - let mut record = Record::create(key, timestamp, Bytes::new(), Meta::default())?; + let mut record = Record::create(key, timestamp, Bytes::new(), meta)?; record.header.mark_as_deleted()?; Ok(record) } diff --git a/src/storage/core.rs b/src/storage/core.rs index 2dbd6ae267..0fe20ad0e6 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -985,23 +985,35 @@ where /// # Errors /// Fails after any disk IO errors. pub async fn delete(&self, key: impl AsRef, timestamp: BlobRecordTimestamp, only_if_presented: bool) -> Result { + self.delete_with_optional_meta(key, timestamp, None, only_if_presented).await + } + + /// Delete entries with matching key. Appends metadata to deletion record + /// # Errors + /// Fails after any disk IO errors. + pub async fn delete_with(&self, key: impl AsRef, timestamp: BlobRecordTimestamp, meta: Meta, only_if_presented: bool) -> Result { + self.delete_with_optional_meta(key, timestamp, Some(meta), only_if_presented).await + } + + + async fn delete_with_optional_meta(&self, key: impl AsRef, timestamp: BlobRecordTimestamp, meta: Option, only_if_presented: bool) -> Result { let mut total = 0; let mut safe = self.inner.safe.write().await; total += self - .mark_all_as_deleted_active(&mut *safe, key.as_ref(), timestamp, only_if_presented) + .delete_in_active(&mut *safe, key.as_ref(), timestamp, meta.clone(), only_if_presented) .await?; total += self - .mark_all_as_deleted_closed(&mut *safe, key.as_ref(), timestamp) + .delete_in_closed(&mut *safe, key.as_ref(), timestamp, meta) .await?; debug!("{} deleted total", total); Ok(total) } - async fn mark_all_as_deleted_closed(&self, safe: &mut Safe, key: &K, timestamp: BlobRecordTimestamp) -> Result { + async fn delete_in_closed(&self, safe: &mut Safe, key: &K, timestamp: BlobRecordTimestamp, meta: Option) -> Result { let mut blobs = safe.blobs.write().await; let entries_closed_blobs = blobs .iter_mut() - .map(|b| b.mark_all_as_deleted(key, timestamp, false)) + .map(|b| b.delete(key, timestamp, meta.clone(), false)) .collect::>(); let total = entries_closed_blobs .map(|result| match result { @@ -1024,11 +1036,12 @@ where Ok(total as u64) } - async fn mark_all_as_deleted_active( + async fn delete_in_active( &self, safe: &mut Safe, key: &K, timestamp: BlobRecordTimestamp, + meta: Option, only_if_presented: bool, ) -> Result { if !only_if_presented { @@ -1039,7 +1052,7 @@ where let is_deleted = active_blob .write() .await - .mark_all_as_deleted(key, timestamp, only_if_presented) + .delete(key, timestamp, meta, only_if_presented) .await?; let count = if is_deleted { 1 } else { 0 }; debug!("{} deleted from active blob", count); From 461ba4acdef89ea26de3518833bb564447aa6e15 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 22:53:46 +0100 Subject: [PATCH 08/24] [231] Fix build --- src/storage/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/core.rs b/src/storage/core.rs index 0fe20ad0e6..d87c339cd8 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -319,7 +319,7 @@ where ); return Ok(()); } - let record = Record::create(key, timestamp.into(), value, meta.unwrap_or_default()) + let record = Record::create(key, timestamp.into(), value, meta) .with_context(|| "storage write with record creation failed")?; let safe = self.inner.safe.read().await; let blob = safe From c05616d79bf367f1597ee8540987b8e118a308f8 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 23:02:02 +0100 Subject: [PATCH 09/24] [231] Update changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d10aa4c6d..bf1dd57905 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,10 @@ Pearl changelog ## [Unreleased] #### Added - +- Add ability to attach metadata to the deletion record (#229) #### Changed - +- Receive timestamp as parameter, store it within record and return it with BlobRecordTimestamp (#231) #### Fixed From a5b9b472eae3313fcbd9b2b184b995f43662ab07 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 14 Jan 2023 23:06:01 +0100 Subject: [PATCH 10/24] [231] Update comment --- src/storage/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/core.rs b/src/storage/core.rs index d87c339cd8..d3af1e2af1 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -480,7 +480,7 @@ where // If more than 1 blobs affect the result, then the order can be broken: restore it all_entries.sort_by(|a, b| b.timestamp().cmp(&a.timestamp())); if deletion_marker_presence { - // If deletion marker presented, we should find it and remove + // If deletion marker presented, we should find it and remove records after let first_del = all_entries.iter().position(|h| h.is_deleted()); if let Some(first_del) = first_del { all_entries.truncate(first_del + 1); From 0ff13dc3e35188931dbe43b938d3c6e3e561de1c Mon Sep 17 00:00:00 2001 From: ikopylov Date: Wed, 17 May 2023 21:00:08 +0200 Subject: [PATCH 11/24] [231] Fix build after merge --- src/record/record.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/record/record.rs b/src/record/record.rs index f29d78bba9..cef056c9c4 100644 --- a/src/record/record.rs +++ b/src/record/record.rs @@ -139,7 +139,7 @@ impl Record { { let key = key.as_ref().to_vec(); let meta = meta.unwrap_or_default(); - let meta_size = meta.serialized_size()?; + let meta_size = meta.serialized_size(); let data_checksum = CRC32C.checksum(&data); let header = Header::new(key, timestamp, meta_size, data.len() as u64, data_checksum); Ok(Self { header, meta, data }) @@ -392,7 +392,7 @@ mod tests { let checksum: u32 = CRC32C.checksum(&data); let meta = Meta::new(); let header = - RecordHeader::new(key, meta.serialized_size(), data.len() as u64, checksum); + RecordHeader::new(key, 101, meta.serialized_size(), data.len() as u64, checksum); let header_size = header.serialized_size(); let record = Record::new(header, meta, data); let offset: u64 = 101 * i as u64; @@ -417,7 +417,7 @@ mod tests { let checksum: u32 = CRC32C.checksum(&data); let meta = Meta::new(); let mut header = - RecordHeader::new(key, meta.serialized_size(), data.len() as u64, checksum); + RecordHeader::new(key, 101, meta.serialized_size(), data.len() as u64, checksum); let header_size = header.serialized_size() as usize; let record = Record::new(header.clone(), meta.clone(), data.clone()); let offset: u64 = 101 * i as u64; From afceb19bcccdc5c2ecadda03027a199fae9e13f8 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Wed, 17 May 2023 21:19:57 +0200 Subject: [PATCH 12/24] [231] Insert position search improvement --- src/blob/index/core.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/blob/index/core.rs b/src/blob/index/core.rs index 39b6097bcb..e136353332 100644 --- a/src/blob/index/core.rs +++ b/src/blob/index/core.rs @@ -333,9 +333,13 @@ where if let Some(v) = data.headers.get_mut(key) { let old_capacity = v.capacity(); // Keep ordered by timestamp - let mut pos = v.binary_search_by(|item| item.timestamp().cmp(&h.timestamp())).unwrap_or_else(|e| e); - // Skip records with equal timestamp (our should be the latest) - while pos < v.len() && v[pos].timestamp() == h.timestamp() { + let mut pos = 0; + if v.len() > 4 { + // Use binary search when len > 4. For smaller len sequential search will be faster + pos = v.binary_search_by(|item| item.timestamp().cmp(&h.timestamp())).unwrap_or_else(|e| e); + } + // Skip records with timestamp less or equal to our (our should be the latest) + while pos < v.len() && v[pos].timestamp() <= h.timestamp() { pos += 1; } v.insert(pos, h); From 156149c090bad1d70fdb597644a34009982c7b53 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Wed, 17 May 2023 21:41:38 +0200 Subject: [PATCH 13/24] [231] Add test for ordering by timestamp --- tests/tests.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/tests/tests.rs b/tests/tests.rs index a4719514ea..604c993c09 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1345,3 +1345,79 @@ async fn test_read_all_with_deletion_marker_delete_middle_different_blobs() -> R common::clean(storage, path).await; Ok(()) } + + +#[tokio::test] +async fn test_read_ordered_by_timestamp() -> Result<()> { + let path = common::init("read_ordered_by_timestamp"); + let storage = common::default_test_storage_in(&path).await.unwrap(); + let key: KeyTest = vec![0].into(); + let data1: Bytes = "1. test data string".repeat(16).as_bytes().to_vec().into(); + let data2: Bytes = "2. test data string".repeat(16).as_bytes().to_vec().into(); + let data3: Bytes = "3. test data string".repeat(16).as_bytes().to_vec().into(); + storage.write(&key, data1.clone(), BlobRecordTimestamp::new(10)).await?; + storage.write(&key, data2.clone(), BlobRecordTimestamp::new(10)).await?; + storage.write(&key, data3.clone(), BlobRecordTimestamp::new(5)).await?; + + let read = storage.read_all_with_deletion_marker(&key).await?; + + assert_eq!(3, read.len()); + assert_eq!(BlobRecordTimestamp::new(10), read[0].timestamp()); + assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(10), read[1].timestamp()); + assert_eq!(data1, Bytes::from(read[1].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(5), read[2].timestamp()); + assert_eq!(data3, Bytes::from(read[2].load_data().await.unwrap())); + + std::mem::drop(read); // Entry holds file + + let data = storage.read(&key).await?; + assert!(data.is_found()); + assert_eq!(data2, Bytes::from(data.into_option().unwrap())); + + let contains = storage.contains(&key).await?; + assert!(contains.is_found()); + assert_eq!(BlobRecordTimestamp::new(10), contains.into_option().unwrap()); + + common::clean(storage, path).await; + Ok(()) +} + +#[tokio::test] +async fn test_read_ordered_by_timestamp_in_different_blobs() -> Result<()> { + let path = common::init("read_ordered_by_timestamp_in_different_blobs"); + let storage = common::default_test_storage_in(&path).await.unwrap(); + let key: KeyTest = vec![0].into(); + let data1: Bytes = "1. test data string".repeat(16).as_bytes().to_vec().into(); + let data2: Bytes = "2. test data string".repeat(16).as_bytes().to_vec().into(); + let data3: Bytes = "3. test data string".repeat(16).as_bytes().to_vec().into(); + storage.write(&key, data1.clone(), BlobRecordTimestamp::new(10)).await?; + storage.try_close_active_blob().await?; + storage.write(&key, data2.clone(), BlobRecordTimestamp::new(10)).await?; + storage.try_close_active_blob().await?; + storage.write(&key, data3.clone(), BlobRecordTimestamp::new(5)).await?; + storage.try_close_active_blob().await?; + + let read = storage.read_all_with_deletion_marker(&key).await?; + + assert_eq!(3, read.len()); + assert_eq!(BlobRecordTimestamp::new(10), read[0].timestamp()); + assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(10), read[1].timestamp()); + assert_eq!(data1, Bytes::from(read[1].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(5), read[2].timestamp()); + assert_eq!(data3, Bytes::from(read[2].load_data().await.unwrap())); + + std::mem::drop(read); // Entry holds file + + let data = storage.read(&key).await?; + assert!(data.is_found()); + assert_eq!(data2, Bytes::from(data.into_option().unwrap())); + + let contains = storage.contains(&key).await?; + assert!(contains.is_found()); + assert_eq!(BlobRecordTimestamp::new(10), contains.into_option().unwrap()); + + common::clean(storage, path).await; + Ok(()) +} \ No newline at end of file From 834b974dc27c4e6ba44b5037b5af312fb13540c0 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Wed, 17 May 2023 21:47:43 +0200 Subject: [PATCH 14/24] [231] Extend ordering test with deletion --- tests/tests.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/tests.rs b/tests/tests.rs index 604c993c09..aa2ba9a563 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1358,16 +1358,19 @@ async fn test_read_ordered_by_timestamp() -> Result<()> { storage.write(&key, data1.clone(), BlobRecordTimestamp::new(10)).await?; storage.write(&key, data2.clone(), BlobRecordTimestamp::new(10)).await?; storage.write(&key, data3.clone(), BlobRecordTimestamp::new(5)).await?; + storage.delete(&key, BlobRecordTimestamp::new(0), false).await?; let read = storage.read_all_with_deletion_marker(&key).await?; - assert_eq!(3, read.len()); + assert_eq!(4, read.len()); assert_eq!(BlobRecordTimestamp::new(10), read[0].timestamp()); assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); assert_eq!(BlobRecordTimestamp::new(10), read[1].timestamp()); assert_eq!(data1, Bytes::from(read[1].load_data().await.unwrap())); assert_eq!(BlobRecordTimestamp::new(5), read[2].timestamp()); assert_eq!(data3, Bytes::from(read[2].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(0), read[3].timestamp()); + assert!(read[3].is_deleted()); std::mem::drop(read); // Entry holds file @@ -1397,16 +1400,19 @@ async fn test_read_ordered_by_timestamp_in_different_blobs() -> Result<()> { storage.try_close_active_blob().await?; storage.write(&key, data3.clone(), BlobRecordTimestamp::new(5)).await?; storage.try_close_active_blob().await?; + storage.delete(&key, BlobRecordTimestamp::new(0), false).await?; let read = storage.read_all_with_deletion_marker(&key).await?; - assert_eq!(3, read.len()); + assert_eq!(4, read.len()); assert_eq!(BlobRecordTimestamp::new(10), read[0].timestamp()); assert_eq!(data2, Bytes::from(read[0].load_data().await.unwrap())); assert_eq!(BlobRecordTimestamp::new(10), read[1].timestamp()); assert_eq!(data1, Bytes::from(read[1].load_data().await.unwrap())); assert_eq!(BlobRecordTimestamp::new(5), read[2].timestamp()); assert_eq!(data3, Bytes::from(read[2].load_data().await.unwrap())); + assert_eq!(BlobRecordTimestamp::new(0), read[3].timestamp()); + assert!(read[3].is_deleted()); std::mem::drop(read); // Entry holds file From 50062ce8c5e69fe3fc501fd0b375a0625b9855d1 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Wed, 17 May 2023 22:46:52 +0200 Subject: [PATCH 15/24] [231] Update `contains` to preserve ordering --- src/blob/core.rs | 6 +++--- src/storage/core.rs | 15 ++++++-------- src/storage/read_result.rs | 42 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index a381b81791..bd73c1794f 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -269,7 +269,7 @@ where check_filters: bool, ) -> Result> { debug!("blob read any"); - let entry = self.get_entry(key, meta, check_filters).await?; + let entry = self.get_last_entry(key, meta, check_filters).await?; match entry { ReadResult::Found(entry) => { debug!("blob read any entry found"); @@ -349,7 +349,7 @@ where .collect() } - async fn get_entry( + pub(crate) async fn get_last_entry( &self, key: &K, meta: Option<&Meta>, @@ -415,7 +415,7 @@ where ) -> Result> { debug!("blob contains"); let contains = self - .get_entry(key, meta, true) + .get_last_entry(key, meta, true) .await? .map(|e| e.timestamp()); debug!("blob contains any: {:?}", contains); diff --git a/src/storage/core.rs b/src/storage/core.rs index 7c5ca9abac..599fd00d4e 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -908,22 +908,19 @@ where key: &K, meta: Option<&Meta>, ) -> Result> { + let mut latest_result: ReadResult = ReadResult::NotFound; + let inner = self.inner.safe.read().await; if let Some(active_blob) = &inner.active_blob { - let res = active_blob.read().await.contains(key, meta).await?; - if res.is_presented() { - return Ok(res); - } + latest_result = latest_result.latest(active_blob.read().await.contains(key, meta).await?); } + let blobs = inner.blobs.read().await; for blob in blobs.iter_possible_childs_rev(key) { - let res = blob.1.data.contains(key, meta).await?; - if res.is_presented() { - return Ok(res); - } + latest_result = latest_result.latest(blob.1.data.contains(key, meta).await?); } - Ok(ReadResult::NotFound) + Ok(latest_result) } /// `check_filters` is used to check whether a key is in storage. diff --git a/src/storage/read_result.rs b/src/storage/read_result.rs index 076f2badf3..803d8957cf 100644 --- a/src/storage/read_result.rs +++ b/src/storage/read_result.rs @@ -1,4 +1,5 @@ use std::fmt::Display; +use crate::Entry; /// Timestamp #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] @@ -111,3 +112,44 @@ impl ReadResult { } } } + +impl ReadResult { + fn timestamp(&self) -> Option { + match &self { + ReadResult::Found(ts) => Some(*ts), + ReadResult::Deleted(ts) => Some(*ts), + ReadResult::NotFound => None + } + } + + /// Returns [`ReadResult`] with max timetamp. + /// If timestamps are equal, then `self` is preserved + pub fn latest(self, other: ReadResult) -> ReadResult { + if other.timestamp() > self.timestamp() { + other + } else { + self + } + } +} + + +impl ReadResult { + fn timestamp(&self) -> Option { + match &self { + ReadResult::Found(entry) => Some(entry.timestamp()), + ReadResult::Deleted(ts) => Some(*ts), + ReadResult::NotFound => None + } + } + + /// Returns [`ReadResult`] with max timetamp. + /// If timestamps are equal, then `self` is preserved + pub fn latest(self, other: ReadResult) -> ReadResult { + if other.timestamp() > self.timestamp() { + other + } else { + self + } + } +} \ No newline at end of file From 4a06ab13105808b97a78ddd0a8caed60ea4638a9 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Thu, 18 May 2023 19:52:27 +0200 Subject: [PATCH 16/24] [231] get_any -> get_latest --- src/blob/core.rs | 10 +++++----- src/blob/index/benchmarks.rs | 2 +- src/blob/index/bptree/core.rs | 2 +- src/blob/index/bptree/tests.rs | 6 +++--- src/blob/index/core.rs | 8 ++++---- src/blob/index/mod.rs | 2 +- src/blob/index/simple.rs | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index bd73c1794f..cd4e9e152b 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -269,7 +269,7 @@ where check_filters: bool, ) -> Result> { debug!("blob read any"); - let entry = self.get_last_entry(key, meta, check_filters).await?; + let entry = self.get_latest_entry(key, meta, check_filters).await?; match entry { ReadResult::Found(entry) => { debug!("blob read any entry found"); @@ -324,7 +324,7 @@ where meta: Option, only_if_presented: bool, ) -> Result { - if !only_if_presented || self.index.get_any(key).await?.is_found() { + if !only_if_presented || self.index.get_latest(key).await?.is_found() { let record = Record::deleted(key, timestamp.into(), meta)?; self.push_deletion_record(key, record).await?; Ok(true) @@ -349,7 +349,7 @@ where .collect() } - pub(crate) async fn get_last_entry( + pub(crate) async fn get_latest_entry( &self, key: &K, meta: Option<&Meta>, @@ -366,7 +366,7 @@ where debug!("blob get any entry bloom true no meta"); Ok(self .index - .get_any(key) + .get_latest(key) .await .with_context(|| { format!("index get any failed for blob: {:?}", self.name.to_path()) @@ -415,7 +415,7 @@ where ) -> Result> { debug!("blob contains"); let contains = self - .get_last_entry(key, meta, true) + .get_latest_entry(key, meta, true) .await? .map(|e| e.timestamp()); debug!("blob contains any: {:?}", contains); diff --git a/src/blob/index/benchmarks.rs b/src/blob/index/benchmarks.rs index f536a7eb75..1769512739 100644 --- a/src/blob/index/benchmarks.rs +++ b/src/blob/index/benchmarks.rs @@ -208,7 +208,7 @@ async fn benchmark_get_any() { if (i as u32 + 1) % PRINT_EVERY == 0 { println!("Iteration: {}...", i + 1); } - let _ = findex.get_any(&q.into()).await.unwrap(); + let _ = findex.get_latest(&q.into()).await.unwrap(); } println!( "get_any avg time: {}\n", diff --git a/src/blob/index/bptree/core.rs b/src/blob/index/bptree/core.rs index 3793becee4..6e49c34e2a 100644 --- a/src/blob/index/bptree/core.rs +++ b/src/blob/index/bptree/core.rs @@ -152,7 +152,7 @@ where .map(|headers| (headers, self.header.records_count)) } - async fn get_any(&self, key: &K) -> Result> { + async fn get_latest(&self, key: &K) -> Result> { let root_offset = self.metadata.tree_offset; let buf = BytesMut::zeroed(BLOCK_SIZE); let (buf, leaf_offset) = self.find_leaf_node(key, root_offset, buf).await?; diff --git a/src/blob/index/bptree/tests.rs b/src/blob/index/bptree/tests.rs index aa875aff12..8339ca3b0a 100644 --- a/src/blob/index/bptree/tests.rs +++ b/src/blob/index/bptree/tests.rs @@ -236,7 +236,7 @@ async fn check_get_any() { .expect("Can't create file index"); let presented_keys = RANGE_FROM..RANGE_TO; for key in presented_keys.map(|k| k.into()) { - if let Ok(inner_res) = findex.get_any(&key).await { + if let Ok(inner_res) = findex.get_latest(&key).await { if let Some(actual_header) = inner_res { let key_deserialized: usize = key.clone().into(); assert_eq!( @@ -254,7 +254,7 @@ async fn check_get_any() { let not_presented_ranges = [0..RANGE_FROM, RANGE_TO..(RANGE_TO + 100)]; for not_presented_keys in not_presented_ranges.iter() { for key in not_presented_keys.clone().map(|k| serialize(&k).unwrap()) { - assert_eq!(None, findex.get_any(&key.into()).await.unwrap()); + assert_eq!(None, findex.get_latest(&key.into()).await.unwrap()); } } @@ -332,7 +332,7 @@ async fn check_get() { .expect("Can't create file index"); let presented_keys = RANGE_FROM..RANGE_TO; for key in presented_keys.map(|k| k.into()) { - if let Ok(inner_res) = findex.get_any(&key).await { + if let Ok(inner_res) = findex.get_latest(&key).await { if let Some(actual_header) = inner_res { let key_deserialized: usize = key.clone().into(); assert_eq!( diff --git a/src/blob/index/core.rs b/src/blob/index/core.rs index e136353332..768e64d310 100644 --- a/src/blob/index/core.rs +++ b/src/blob/index/core.rs @@ -316,7 +316,7 @@ where for<'a> K: Key<'a>, { async fn contains_key(&self, key: &K) -> Result> { - self.get_any(key) + self.get_latest(key) .await .map(|h| h.map(|h| BlobRecordTimestamp::new(h.timestamp()))) } @@ -394,7 +394,7 @@ where } } - async fn get_any(&self, key: &K) -> Result> { + async fn get_latest(&self, key: &K) -> Result> { debug!("index get any"); let result = match &self.inner { State::InMemory(headers) => { @@ -406,7 +406,7 @@ where } State::OnDisk(findex) => { debug!("index get any on disk"); - findex.get_any(key).await? + findex.get_latest(key).await? } }; Ok(match result { @@ -465,7 +465,7 @@ pub(crate) trait FileIndexTrait: Sized + Send + Sync { async fn read_meta_at(&self, i: u64) -> Result; async fn find_by_key(&self, key: &K) -> Result>>; async fn get_records_headers(&self, blob_size: u64) -> Result<(InMemoryIndex, usize)>; - async fn get_any(&self, key: &K) -> Result>; + async fn get_latest(&self, key: &K) -> Result>; fn validate(&self, blob_size: u64) -> Result<()>; fn memory_used(&self) -> usize; } diff --git a/src/blob/index/mod.rs b/src/blob/index/mod.rs index 9c2edefc94..dcab6c1302 100644 --- a/src/blob/index/mod.rs +++ b/src/blob/index/mod.rs @@ -31,7 +31,7 @@ mod prelude { pub(crate) trait IndexTrait: Send + Sync { async fn get_all(&self, key: &K) -> Result>; async fn get_all_with_deletion_marker(&self, key: &K) -> Result>; - async fn get_any(&self, key: &K) -> Result>; + async fn get_latest(&self, key: &K) -> Result>; fn push(&self, key: &K, h: RecordHeader) -> Result<()>; async fn contains_key(&self, key: &K) -> Result>; fn count(&self) -> usize; diff --git a/src/blob/index/simple.rs b/src/blob/index/simple.rs index 1300559de3..94de02542b 100644 --- a/src/blob/index/simple.rs +++ b/src/blob/index/simple.rs @@ -114,7 +114,7 @@ where .map(|headers| (headers, self.header.records_count)) } - async fn get_any(&self, key: &K) -> Result> { + async fn get_latest(&self, key: &K) -> Result> { Self::binary_search(&self.file, key, &self.header) .await .map(|res| res.map(|h| h.0)) From b1a9a94a864495f0b827a80a1be401a5e41da4c1 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Thu, 18 May 2023 20:02:55 +0200 Subject: [PATCH 17/24] [231] Synchronize read and contains implementation on Storage level. Now they both returns latest record + stops on the first error --- src/blob/core.rs | 5 +- src/storage/core.rs | 126 +++++++++++++++++-------------------- src/storage/read_result.rs | 4 -- 3 files changed, 63 insertions(+), 72 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index cd4e9e152b..80d68f76b9 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -262,7 +262,8 @@ where Ok(header) } - pub(crate) async fn read_last( + #[allow(dead_code)] + pub(crate) async fn read_latest( &self, key: &K, meta: Option<&Meta>, @@ -349,6 +350,7 @@ where .collect() } + /// Returns latest Entry from Blob for specified key and meta pub(crate) async fn get_latest_entry( &self, key: &K, @@ -408,6 +410,7 @@ where Ok(None) } + #[allow(dead_code)] pub(crate) async fn contains( &self, key: &K, diff --git a/src/storage/core.rs b/src/storage/core.rs index 599fd00d4e..ce3ca10816 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -491,69 +491,71 @@ where Ok(all_entries) } - async fn read_with_optional_meta( - &self, + + /// Returns latest Entry by timestamp + async fn get_latest_entry( + safe: &Safe, key: &K, meta: Option<&Meta>, - ) -> Result> { - debug!("storage read with optional meta {:?}, {:?}", key, meta); - let safe = self.inner.safe.read().await; + ) -> Result> { + let mut latest_entry: ReadResult = ReadResult::NotFound; + if let Some(ablob) = safe.active_blob.as_ref() { - match ablob.read().await.read_last(key, meta, true).await { - Ok(data) => { - if data.is_presented() { - debug!("storage read with optional meta active blob returned data"); - return Ok(data); - } - } - Err(e) => debug!("read with optional meta active blob returned: {:#?}", e), - } + let ablob_entry = ablob.read().await.get_latest_entry(key, meta, true).await.map_err(|err| { + debug!("get_latest_entry from active blob returned error: {:?}", err); + err + })?; + + latest_entry = latest_entry.latest(ablob_entry); } - Self::get_data_last(&safe, key, meta).await - } - async fn get_data_last( - safe: &Safe, + let blobs = safe.blobs.read().await; + let mut stream = blobs + .iter_possible_childs_rev(key) + .map(|(_, blob)| blob.data.get_latest_entry(key, meta, true)) + .collect::>(); + + while let Some(entry) = stream.next().await { + let entry = entry.map_err(|err| { + debug!("get_latest_entry from closed blob returned error: {:?}", err); + err + })?; + latest_entry = latest_entry.latest(entry); + } + + Ok(latest_entry) + } + + async fn read_with_optional_meta( + &self, key: &K, meta: Option<&Meta>, ) -> Result> { - let blobs = safe.blobs.read().await; - let possible_blobs = blobs - .iter_possible_childs_rev(key) - .map(|(id, blob)| async move { - if !matches!(blob.data.check_filter(key).await, FilterResult::NotContains) { - Some(id) - } else { - None - } - }) - .collect::>() - .filter_map(|x| x) - .collect::>() - .await; - debug!( - "len of possible blobs: {} (start len: {})", - possible_blobs.len(), - blobs.len() - ); - let stream: FuturesOrdered<_> = possible_blobs - .into_iter() - .filter_map(|id| blobs.get_child(id)) - .map(|blob| blob.data.read_last(key, meta, false)) - .collect(); - debug!("read with optional meta {} closed blobs", stream.len()); - - let mut task = stream.skip_while(|read_res| { - match read_res { - Ok(inner_res) => inner_res.is_not_found(), // Skip not found - Err(e) => { - debug!("error reading data from Blob (blob.read_any): {:?}", e); - true // skip errors - } - } - }); + debug!("storage read with optional meta {:?}, {:?}", key, meta); + let safe = self.inner.safe.read().await; + let latest_entry = Self::get_latest_entry(&safe, key, meta).await?; - task.next().await.unwrap_or(Ok(ReadResult::NotFound)) + match latest_entry { + ReadResult::Found(entry) => { + trace!("Storage::read_with_optional_meta: entry found"); + let buf = entry + .load() + .await + .with_context(|| { + format!( + "failed to read key {:?} with meta {:?} from blob ??", + key, + meta, + //self.name.to_path() + ) + })? + .into_data(); + trace!("Storage::read_with_optional_meta: loaded bytes: {}", buf.len()); + Ok(ReadResult::Found(buf)) + }, + ReadResult::Deleted(ts) => Ok(ReadResult::Deleted(ts)), + ReadResult::NotFound => Ok(ReadResult::NotFound) + } } /// Stop blob updater and release lock file @@ -908,19 +910,9 @@ where key: &K, meta: Option<&Meta>, ) -> Result> { - let mut latest_result: ReadResult = ReadResult::NotFound; - - let inner = self.inner.safe.read().await; - if let Some(active_blob) = &inner.active_blob { - latest_result = latest_result.latest(active_blob.read().await.contains(key, meta).await?); - } - - let blobs = inner.blobs.read().await; - for blob in blobs.iter_possible_childs_rev(key) { - latest_result = latest_result.latest(blob.1.data.contains(key, meta).await?); - } - - Ok(latest_result) + let safe = self.inner.safe.read().await; + let latest_result = Self::get_latest_entry(&safe, key, meta).await?; + Ok(latest_result.map(|entry| entry.timestamp())) } /// `check_filters` is used to check whether a key is in storage. diff --git a/src/storage/read_result.rs b/src/storage/read_result.rs index 803d8957cf..7b84310fdb 100644 --- a/src/storage/read_result.rs +++ b/src/storage/read_result.rs @@ -45,10 +45,6 @@ impl Into for BlobRecordTimestamp { } impl ReadResult { - pub(crate) fn is_presented(&self) -> bool { - !matches!(self, ReadResult::NotFound) - } - /// Is this found result pub fn is_found(&self) -> bool { matches!(self, ReadResult::Found(_)) From 727e5f511140211ed0a7a3a2e8fb9a33333b527f Mon Sep 17 00:00:00 2001 From: ikopylov Date: Thu, 18 May 2023 21:53:11 +0200 Subject: [PATCH 18/24] [231] Include BLOB path into errors returned from Entry functions --- src/blob/core.rs | 27 ++++++++++----------------- src/blob/entry.rs | 25 +++++++++++++++++-------- src/storage/core.rs | 9 +-------- 3 files changed, 28 insertions(+), 33 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index 80d68f76b9..0b113febb9 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -24,7 +24,7 @@ where { header: Header, index: Index, - name: FileName, + name: Arc, file: File, created_at: SystemTime, validate_data_during_index_regen: bool, @@ -56,7 +56,7 @@ where let mut blob = Self { header, index, - name, + name: Arc::new(name), file, created_at: SystemTime::now(), validate_data_during_index_regen, @@ -171,7 +171,7 @@ where let mut blob = Self { header, file, - name, + name: Arc::new(name), index, created_at, validate_data_during_index_regen, @@ -277,14 +277,7 @@ where let buf = entry .load() .await - .with_context(|| { - format!( - "failed to read key {:?} with meta {:?} from blob {:?}", - key, - meta, - self.name.to_path() - ) - })? + .with_context(|| format!("Failed to read data for key {:?} with meta {:?}", key, meta))? .into_data(); debug!("blob read any entry loaded bytes: {}", buf.len()); Ok(ReadResult::Found(buf)) @@ -302,7 +295,7 @@ where .iter() .zip(headers.iter().skip(1)) .all(|(x, y)| x.timestamp() >= y.timestamp())); - Ok(Self::headers_to_entries(headers, &self.file)) + Ok(Self::headers_to_entries(headers, &self.file, &self.name)) } #[inline] @@ -315,7 +308,7 @@ where .iter() .zip(headers.iter().skip(1)) .all(|(x, y)| x.timestamp() >= y.timestamp())); - Ok(Self::headers_to_entries(headers, &self.file)) + Ok(Self::headers_to_entries(headers, &self.file, &self.name)) } pub(crate) async fn delete( @@ -343,10 +336,10 @@ where self.index.push_deletion(key, header) } - fn headers_to_entries(headers: Vec, file: &File) -> Vec { + fn headers_to_entries(headers: Vec, file: &File, file_name: &Arc) -> Vec { headers .into_iter() - .map(|header| Entry::new(header, file.clone())) + .map(|header| Entry::new(header, file.clone(), file_name.clone())) .collect() } @@ -374,7 +367,7 @@ where format!("index get any failed for blob: {:?}", self.name.to_path()) })? .map(|header| { - let entry = Entry::new(header, self.file.clone()); + let entry = Entry::new(header, self.file.clone(), self.name.clone()); debug!("blob, get any entry, bloom true no meta, entry found"); entry })) @@ -390,7 +383,7 @@ where if deleted_ts.is_some() { headers.truncate(headers.len() - 1); } - let entries = Self::headers_to_entries(headers, &self.file); + let entries = Self::headers_to_entries(headers, &self.file, &self.name); if let Some(entries) = self.filter_entries(entries, meta).await? { Ok(ReadResult::Found(entries)) } else { diff --git a/src/blob/entry.rs b/src/blob/entry.rs index 0e5476accf..00879247fe 100644 --- a/src/blob/entry.rs +++ b/src/blob/entry.rs @@ -16,6 +16,7 @@ pub struct Entry { header: RecordHeader, meta: Option, blob_file: File, + blob_file_name: Arc } impl Entry { @@ -31,12 +32,15 @@ impl Entry { .read_exact_at_allocate(data_size + meta_size, self.header.meta_offset()) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .context("Record load failed")?; + .with_context(|| format!("Record load failed from BLOB: {}", self.blob_file_name.to_path().display()))?; let mut buf = buf.freeze(); let data_buf = buf.split_off(meta_size); - let meta = Meta::from_raw(&buf).map_err(|err| Error::from(err))?; - let record = Record::new(self.header.clone(), meta, data_buf); - record.validate() + let meta = Meta::from_raw(&buf) + .map_err(|err| Error::from(err)) + .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.to_path().display()))?; + Record::new(self.header, meta, data_buf) + .validate() + .with_context(|| format!("Validation failed for Record loaded from BLOB: {}", self.blob_file_name.to_path().display())) } /// Returns only data. @@ -48,7 +52,7 @@ impl Entry { .read_exact_at_allocate(self.header.data_size().try_into()?, data_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .context("Error loading Record data") + .with_context(|| format!("Error loading Record data from BLOB: {}", self.blob_file_name.to_path().display())) } /// Loads meta data from fisk, and returns reference to it. @@ -61,8 +65,12 @@ impl Entry { .read_exact_at_allocate(self.header.meta_size().try_into()?, meta_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("failed to read Record metadata, offset: {}", meta_offset))?; - self.meta = Some(Meta::from_raw(&buf).map_err(|err| Error::from(err))?); + .with_context(|| format!("Failed to read Record metadata from BLOB: {}", self.blob_file_name.to_path().display()))?; + let meta = Meta::from_raw(&buf) + .map_err(|err| Error::from(err)) + .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.to_path().display()))?; + + self.meta = Some(meta); Ok(self.meta.as_ref()) } @@ -76,11 +84,12 @@ impl Entry { BlobRecordTimestamp::new(self.header.timestamp()) } - pub(crate) fn new(header: RecordHeader, blob_file: File) -> Self { + pub(crate) fn new(header: RecordHeader, blob_file: File, blob_file_name: Arc) -> Self { Self { meta: None, header, blob_file, + blob_file_name } } } diff --git a/src/storage/core.rs b/src/storage/core.rs index ce3ca10816..78dbe7450f 100644 --- a/src/storage/core.rs +++ b/src/storage/core.rs @@ -541,14 +541,7 @@ where let buf = entry .load() .await - .with_context(|| { - format!( - "failed to read key {:?} with meta {:?} from blob ??", - key, - meta, - //self.name.to_path() - ) - })? + .with_context(|| format!("Failed to read data for key {:?} with meta {:?}", key, meta))? .into_data(); trace!("Storage::read_with_optional_meta: loaded bytes: {}", buf.len()); Ok(ReadResult::Found(buf)) From 4b151268605001b94c9ad1b34116f1e12ed9f55f Mon Sep 17 00:00:00 2001 From: ikopylov Date: Thu, 18 May 2023 22:11:53 +0200 Subject: [PATCH 19/24] [231] Update Changelog.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69ffa2b386..720f87e755 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ Pearl changelog #### Changed - Receive timestamp as parameter, store it within record and return it with BlobRecordTimestamp (#231) +- `Storage::read` now propagates the error further instead of hiding it (#241) +- `Storage::read` now reads Entries first, select latest one and only then loads the data (#276) #### Fixed - Use `dep:` syntax in feature declaration to avoid unnecessary feature flags (#272) From e3cb259d0f2b8601dac2124b174e51be3bacc76d Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 24 Jun 2023 23:14:17 +0200 Subject: [PATCH 20/24] [231] Fix build errors after merge --- src/blob/entry.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/blob/entry.rs b/src/blob/entry.rs index 4c22913fae..19de813adc 100644 --- a/src/blob/entry.rs +++ b/src/blob/entry.rs @@ -32,15 +32,15 @@ impl Entry { .read_exact_at_allocate(data_size + meta_size, self.header.meta_offset()) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Record load failed from BLOB: {}", self.blob_file_name.to_path().display()))?; + .with_context(|| format!("Record load failed from BLOB: {}", self.blob_file_name.as_path().display()))?; let mut buf = buf.freeze(); let data_buf = buf.split_off(meta_size); let meta = Meta::from_raw(&buf) .map_err(|err| Error::from(err)) - .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.to_path().display()))?; + .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; Record::new(self.header, meta, data_buf) .validate() - .with_context(|| format!("Validation failed for Record loaded from BLOB: {}", self.blob_file_name.to_path().display())) + .with_context(|| format!("Validation failed for Record loaded from BLOB: {}", self.blob_file_name.as_path().display())) } /// Returns only data. @@ -52,7 +52,7 @@ impl Entry { .read_exact_at_allocate(self.header.data_size().try_into()?, data_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Error loading Record data from BLOB: {}", self.blob_file_name.to_path().display()))?; + .with_context(|| format!("Error loading Record data from BLOB: {}", self.blob_file_name.as_path().display()))?; self.header.data_checksum_audit(&data)?; Ok(data) } @@ -67,10 +67,10 @@ impl Entry { .read_exact_at_allocate(self.header.meta_size().try_into()?, meta_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Failed to read Record metadata from BLOB: {}", self.blob_file_name.to_path().display()))?; + .with_context(|| format!("Failed to read Record metadata from BLOB: {}", self.blob_file_name.as_path().display()))?; let meta = Meta::from_raw(&buf) .map_err(|err| Error::from(err)) - .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.to_path().display()))?; + .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; self.meta = Some(meta); Ok(self.meta.as_ref()) From c8b1e5c51a5d1aff1a85630560cf9b1771450208 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Sat, 24 Jun 2023 23:30:38 +0200 Subject: [PATCH 21/24] [231] Updates according to the code review --- src/blob/entry.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/blob/entry.rs b/src/blob/entry.rs index 19de813adc..2166ab64cd 100644 --- a/src/blob/entry.rs +++ b/src/blob/entry.rs @@ -32,15 +32,15 @@ impl Entry { .read_exact_at_allocate(data_size + meta_size, self.header.meta_offset()) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Record load failed from BLOB: {}", self.blob_file_name.as_path().display()))?; + .with_context(|| format!("record load failed from BLOB: {}", self.blob_file_name.as_path().display()))?; let mut buf = buf.freeze(); let data_buf = buf.split_off(meta_size); let meta = Meta::from_raw(&buf) .map_err(|err| Error::from(err)) - .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; + .with_context(|| format!("deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; Record::new(self.header, meta, data_buf) .validate() - .with_context(|| format!("Validation failed for Record loaded from BLOB: {}", self.blob_file_name.as_path().display())) + .with_context(|| format!("validation failed for Record loaded from BLOB: {}", self.blob_file_name.as_path().display())) } /// Returns only data. @@ -52,7 +52,7 @@ impl Entry { .read_exact_at_allocate(self.header.data_size().try_into()?, data_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Error loading Record data from BLOB: {}", self.blob_file_name.as_path().display()))?; + .with_context(|| format!("error loading Record data from BLOB: {}", self.blob_file_name.as_path().display()))?; self.header.data_checksum_audit(&data)?; Ok(data) } @@ -67,10 +67,10 @@ impl Entry { .read_exact_at_allocate(self.header.meta_size().try_into()?, meta_offset) .await .map_err(|err| err.into_bincode_if_unexpected_eof()) - .with_context(|| format!("Failed to read Record metadata from BLOB: {}", self.blob_file_name.as_path().display()))?; + .with_context(|| format!("failed to read Record metadata from BLOB: {}", self.blob_file_name.as_path().display()))?; let meta = Meta::from_raw(&buf) .map_err(|err| Error::from(err)) - .with_context(|| format!("Deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; + .with_context(|| format!("deserialization failed for Meta loaded from BLOB: {}", self.blob_file_name.as_path().display()))?; self.meta = Some(meta); Ok(self.meta.as_ref()) From 4730a8aae502f8d1b1546e478a937cd64996c339 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Mon, 14 Aug 2023 21:55:18 +0200 Subject: [PATCH 22/24] [231] Remove dead code according to code review --- src/blob/core.rs | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index 53dd441748..cb94a39c01 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -270,31 +270,6 @@ where Ok(WriteResult { dirty_bytes: self.file.dirty_bytes() }) } - #[allow(dead_code)] - pub(crate) async fn read_latest( - &self, - key: &K, - meta: Option<&Meta>, - check_filters: bool, - ) -> Result> { - debug!("blob read any"); - let entry = self.get_latest_entry(key, meta, check_filters).await?; - match entry { - ReadResult::Found(entry) => { - debug!("blob read any entry found"); - let buf = entry - .load() - .await - .with_context(|| format!("failed to read data for key {:?} with meta {:?}", key, meta))? - .into_data(); - debug!("blob read any entry loaded bytes: {}", buf.len()); - Ok(ReadResult::Found(buf)) - } - ReadResult::Deleted(ts) => Ok(ReadResult::Deleted(ts)), - ReadResult::NotFound => Ok(ReadResult::NotFound), - } - } - #[allow(dead_code)] #[inline] pub(crate) async fn read_all_entries(&self, key: &K) -> Result> { From adf75668c3ba05a194fb6fe322f7e5b2250350ca Mon Sep 17 00:00:00 2001 From: ikopylov Date: Mon, 14 Aug 2023 21:59:36 +0200 Subject: [PATCH 23/24] [231] Remove unused import --- src/blob/core.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index cb94a39c01..bebab1826d 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -1,6 +1,6 @@ use std::time::SystemTime; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use tokio::time::Instant; use crate::error::ValidationErrorKind; From 1291d4fe31fd6b5789d0456f2cddc28d08c94ca1 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Thu, 21 Sep 2023 21:14:45 +0200 Subject: [PATCH 24/24] [231] Remove dead code --- src/blob/core.rs | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/blob/core.rs b/src/blob/core.rs index 8fcb058cec..d4605d258c 100644 --- a/src/blob/core.rs +++ b/src/blob/core.rs @@ -270,17 +270,6 @@ where Ok(WriteResult { dirty_bytes: self.file.dirty_bytes() }) } - #[allow(dead_code)] - #[inline] - pub(crate) async fn read_all_entries(&self, key: &K) -> Result> { - let headers = self.index.get_all(key).await?; - debug_assert!(headers - .iter() - .zip(headers.iter().skip(1)) - .all(|(x, y)| x.timestamp() >= y.timestamp())); - Ok(Self::headers_to_entries(headers, &self.file, &self.name)) - } - #[inline] pub(crate) async fn read_all_entries_with_deletion_marker( &self, @@ -385,21 +374,6 @@ where Ok(None) } - #[allow(dead_code)] - pub(crate) async fn contains( - &self, - key: &K, - meta: Option<&Meta>, - ) -> Result> { - debug!("blob contains"); - let contains = self - .get_latest_entry(key, meta, true) - .await? - .map(|e| e.timestamp()); - debug!("blob contains any: {:?}", contains); - Ok(contains) - } - #[inline] pub(crate) fn file_size(&self) -> u64 { self.file.size()