Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

188 serialize outside lock #220

Merged
merged 24 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Pearl changelog
#### Changed
- Remove iterators from bloom filter (#194)
- Excessive key conversion removed (#193)
- Removed serialization from critical section (#188)
- Removed multiple header checksum calculations (#206)
- Rename functions to show that it returns last entry (#199)
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved

#### Fixed

Expand Down
32 changes: 20 additions & 12 deletions src/blob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::time::Instant;

use crate::error::ValidationErrorKind;
use crate::filter::{CombinedFilter, FilterTrait};
use crate::record::PartiallySerializedRecord;

use super::prelude::*;

Expand Down Expand Up @@ -238,15 +239,19 @@ where
pub(crate) async fn write(blob: &ASRwLock<Self>, record: Record) -> Result<()> {
debug!("blob write");
// Only one upgradable_read lock is allowed at a time
let partially_serialized = record.to_partially_serialized()?;
let blob = blob.upgradable_read().await;
Self::write_locked(blob, record).await
Self::write_locked(blob, partially_serialized, record).await
}

async fn write_mut(&mut self, mut record: Record) -> Result<()> {
async fn write_mut(&mut self, record: Record) -> Result<()> {
debug!("blob write");
debug!("blob write record offset: {}", self.current_offset);
record.set_offset(self.current_offset)?;
let buf = record.to_raw()?;
let (buf, header_checksum) = record
.to_partially_serialized()?
.into_serialized_with_header_checksum(self.current_offset)?;
let mut header = record.into_header();
header.set_offset_checksum(self.current_offset, header_checksum);
let bytes_written = self
.file
.write_append(&buf)
Expand All @@ -259,17 +264,20 @@ where
_ => e.into(),
}
})? as u64;
self.index.push(record.header().clone())?;
self.index.push(header)?;
self.current_offset += bytes_written;
Ok(())
}
async fn write_locked(
blob: ASRwLockUpgradableReadGuard<'_, Blob<K>>,
mut record: Record,
ps_record: PartiallySerializedRecord,
record: Record,
) -> Result<()> {
debug!("blob write record offset: {}", blob.current_offset);
record.set_offset(blob.current_offset)?;
let buf = record.to_raw()?;
let (buf, header_checksum) =
ps_record.into_serialized_with_header_checksum(blob.current_offset)?;
let mut header = record.into_header();
header.set_offset_checksum(blob.current_offset, header_checksum);
let bytes_written = blob
.file
.write_append(&buf)
Expand All @@ -283,12 +291,12 @@ where
}
})? as u64;
let mut blob = RwLockUpgradableReadGuard::upgrade(blob).await;
blob.index.push(record.header().clone())?;
blob.index.push(header)?;
blob.current_offset += bytes_written;
Ok(())
}

pub(crate) async fn read_any(
pub(crate) async fn read_last(
&self,
key: &K,
meta: Option<&Meta>,
Expand Down Expand Up @@ -326,7 +334,7 @@ where
}

pub(crate) async fn mark_all_as_deleted(&mut self, key: &K) -> Result<Option<u64>> {
if self.index.get_any(key).await?.is_some() {
if self.index.get_last(key).await?.is_some() {
let on_disk = self.index.on_disk();
if on_disk {
self.load_index().await?;
Expand Down Expand Up @@ -362,7 +370,7 @@ where
self.get_entry_with_meta(key, meta).await
} else {
debug!("blob get any entry bloom true no meta");
if let Some(header) = self.index.get_any(key).await.with_context(|| {
if let Some(header) = self.index.get_last(key).await.with_context(|| {
format!("index get any failed for blob: {:?}", self.name.to_path())
})? {
let entry = Entry::new(header, self.file.clone());
Expand Down
4 changes: 2 additions & 2 deletions src/blob/index/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ where
for<'a> K: Key<'a>,
{
async fn contains_key(&self, key: &K) -> Result<bool> {
self.get_any(key).await.map(|h| h.is_some())
self.get_last(key).await.map(|h| h.is_some())
}

fn push(&mut self, h: RecordHeader) -> Result<()> {
Expand Down Expand Up @@ -302,7 +302,7 @@ where
}
}

async fn get_any(&self, key: &K) -> Result<Option<RecordHeader>> {
async fn get_last(&self, key: &K) -> Result<Option<RecordHeader>> {
debug!("index get any");
match &self.inner {
State::InMemory(headers) => {
Expand Down
2 changes: 1 addition & 1 deletion src/blob/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mod prelude {
#[async_trait::async_trait]
pub(crate) trait IndexTrait<K>: Send + Sync {
async fn get_all(&self, key: &K) -> Result<Option<Vec<RecordHeader>>>;
async fn get_any(&self, key: &K) -> Result<Option<RecordHeader>>;
async fn get_last(&self, key: &K) -> Result<Option<RecordHeader>>;
fn push(&mut self, h: RecordHeader) -> Result<()>;
async fn contains_key(&self, key: &K) -> Result<bool>;
fn count(&self) -> usize;
Expand Down
74 changes: 64 additions & 10 deletions src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,52 @@ impl Meta {
}
}

pub(crate) struct PartiallySerializedRecord {
buf: Vec<u8>,
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
offset_pos: usize,
header_checksum_pos: usize,
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
header_start: usize,
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
header_len: usize,
}

impl PartiallySerializedRecord {
fn new(
buf: Vec<u8>,
offset_pos: usize,
header_checksum_pos: usize,
header_start: usize,
header_len: usize,
) -> Self {
Self {
buf,
offset_pos,
header_checksum_pos,
header_start,
header_len,
}
}

pub(crate) fn into_serialized_with_header_checksum(
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
self,
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
blob_offset: u64,
) -> Result<(Vec<u8>, u32)> {
let Self {
mut buf,
offset_pos,
header_checksum_pos,
header_start,
header_len,
} = self;
let offset_slice = &mut buf[offset_pos..];
bincode::serialize_into(offset_slice, &blob_offset)?;
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let header_slice = &buf[header_start..(header_start + header_len)];
let checksum = CRC32C.checksum(header_slice);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let checksum_slice = &mut buf[header_checksum_pos..];
bincode::serialize_into(checksum_slice, &checksum)?;
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
Ok((buf, checksum))
}
}

impl Record {
pub(crate) fn new(header: Header, meta: Meta, data: Vec<u8>) -> Self {
Self { header, meta, data }
Expand All @@ -106,15 +152,19 @@ impl Record {
&self.header
}

/// # Description
/// Serialize record to bytes
pub fn to_raw(&self) -> bincode::Result<Vec<u8>> {
pub(crate) fn to_partially_serialized(&self) -> Result<PartiallySerializedRecord> {
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let mut buf = self.header.to_raw()?;
trace!("raw header: len: {}", buf.len());
let header_len = buf.len();
let raw_meta = self.meta.to_raw()?;
buf.extend(&raw_meta);
buf.extend(&self.data);
Ok(buf)
Ok(PartiallySerializedRecord::new(
buf,
header_len - 24,
header_len - 4,
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
0,
header_len,
))
}

pub(crate) fn deleted<K>(key: &K) -> bincode::Result<Self>
Expand All @@ -126,11 +176,6 @@ impl Record {
Ok(record)
}

pub(crate) fn set_offset(&mut self, offset: u64) -> bincode::Result<()> {
self.header.blob_offset = offset;
self.header.update_checksum()
}

pub(crate) fn validate(self) -> Result<Self> {
self.header().validate()?;
self.check_data_checksum()?;
Expand All @@ -156,6 +201,10 @@ impl Record {
pub fn meta(&self) -> &Meta {
&self.meta
}

pub fn into_header(self) -> Header {
self.header
}
}

impl Header {
Expand Down Expand Up @@ -230,6 +279,11 @@ impl Header {
bincode::serialized_size(&self).expect("calc record serialized size")
}

pub(crate) fn set_offset_checksum(&mut self, blob_offset: u64, header_checksum: u32) {
self.blob_offset = blob_offset;
self.header_checksum = header_checksum;
}

fn update_checksum(&mut self) -> bincode::Result<()> {
self.header_checksum = 0;
self.header_checksum = self.crc32()?;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ where
debug!("storage read with optional meta {:?}, {:?}", key, meta);
let safe = self.inner.safe.read().await;
if let Some(ablob) = safe.active_blob.as_ref() {
match ablob.read().await.read_any(key, meta, true).await {
match ablob.read().await.read_last(key, meta, true).await {
Ok(data) => {
debug!("storage read with optional meta active blob returned data");
return Ok(data);
Expand Down Expand Up @@ -483,7 +483,7 @@ where
let stream: FuturesOrdered<_> = possible_blobs
.into_iter()
.filter_map(|id| blobs.get_child(id))
.map(|blob| blob.data.read_any(key, meta, false))
.map(|blob| blob.data.read_last(key, meta, false))
.collect();
debug!("read with optional meta {} closed blobs", stream.len());
let mut task = stream.skip_while(Result::is_err);
Expand All @@ -495,7 +495,7 @@ where
let blobs = safe.blobs.read().await;
let stream: FuturesUnordered<_> = blobs
.iter_possible_childs_rev(key)
.map(|blob| blob.1.data.read_any(key, meta, true))
.map(|blob| blob.1.data.read_last(key, meta, true))
.collect();
debug!("read with optional meta {} closed blobs", stream.len());
let mut task = stream.skip_while(Result::is_err);
Expand Down