Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

188 serialize outside lock #220

Merged
merged 24 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Pearl changelog


#### Changed

- Removed serialization from critical section (#188)
- Removed multiple header checksum calculations (#206)
- Rename functions to show that it returns last entry (#199)

#### Fixed

Expand Down
31 changes: 19 additions & 12 deletions src/blob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::time::Instant;

use crate::error::ValidationErrorKind;
use crate::filter::{CombinedFilter, FilterTrait};
use crate::record::PartiallySerializedRecord;
use crate::storage::{BlobRecordTimestamp, ReadResult};

use super::prelude::*;
Expand Down Expand Up @@ -243,36 +244,42 @@ where
pub(crate) async fn write(blob: &ASRwLock<Self>, key: &K, record: Record) -> Result<()> {
debug!("blob write");
// Only one upgradable_read lock is allowed at a time
let (partially_serialized, header) = record.to_partially_serialized_and_header()?;
let blob = blob.upgradable_read().await;
Self::write_locked(blob, key, record).await
Self::write_locked(blob, key, partially_serialized, header).await
}

async fn write_mut(&mut self, key: &K, mut record: Record) -> Result<RecordHeader> {
async fn write_mut(&mut self, key: &K, record: Record) -> Result<RecordHeader> {
debug!("blob write");
debug!("blob write record offset: {}", self.current_offset);
record.set_offset(self.current_offset)?;
let bytes_written = record.write_to_file(&self.file).await?;
let header = record.into_header();
let (record, mut header) = record.to_partially_serialized_and_header()?;
let write_result = record
.write_to_file(&self.file, self.current_offset)
.await?;
header.set_offset_checksum(self.current_offset, write_result.header_checksum());
self.index.push(key, header.clone())?;
self.current_offset += bytes_written;
self.current_offset += write_result.bytes_written();
Ok(header)
}

async fn write_locked(
blob: ASRwLockUpgradableReadGuard<'_, Blob<K>>,
key: &K,
mut record: Record,
record: PartiallySerializedRecord,
mut header: RecordHeader,
) -> Result<()> {
debug!("blob write record offset: {}", blob.current_offset);
record.set_offset(blob.current_offset)?;
let bytes_written = record.write_to_file(&blob.file).await?;
let write_result = record
.write_to_file(&blob.file, blob.current_offset)
.await?;
header.set_offset_checksum(blob.current_offset, write_result.header_checksum());
let mut blob = RwLockUpgradableReadGuard::upgrade(blob).await;
blob.index.push(key, record.into_header())?;
blob.current_offset += bytes_written;
blob.index.push(key, header)?;
blob.current_offset += write_result.bytes_written();
Ok(())
}

pub(crate) async fn read_any(
pub(crate) async fn read_last(
&self,
key: &K,
meta: Option<&Meta>,
Expand Down
6 changes: 6 additions & 0 deletions src/record/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod partially_serialized;
mod record;

idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) use partially_serialized::PartiallySerializedRecord;
pub use record::Meta;
pub(crate) use record::{Header, Record, RECORD_MAGIC_BYTE};
111 changes: 111 additions & 0 deletions src/record/partially_serialized.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::blob::File;
use crate::prelude::*;
use bytes::{Bytes, BytesMut};

pub(crate) struct PartiallySerializedWriteResult {
bytes_written: u64,
header_checksum: u32,
}

impl PartiallySerializedWriteResult {
pub(crate) fn bytes_written(&self) -> u64 {
self.bytes_written
}

pub(crate) fn header_checksum(&self) -> u32 {
self.header_checksum
}
}

pub(crate) struct PartiallySerializedRecord {
head_with_data: BytesMut,
header_len: usize,
data: Option<Bytes>,
}

impl PartiallySerializedRecord {
pub(crate) fn new(head_with_data: BytesMut, header_len: usize, data: Option<Bytes>) -> Self {
Self {
head_with_data,
header_len,
data,
}
}

pub(super) fn serialize_with_checksum(
self,
blob_offset: u64,
) -> Result<(Bytes, Option<Bytes>, u32)> {
let Self {
head_with_data,
header_len,
data,
} = self;
let (head, checksum) =
Self::finalize_with_checksum(head_with_data, header_len, blob_offset)?;
Ok((head.freeze(), data, checksum))
}

pub(crate) async fn write_to_file(
self,
file: &File,
offset: u64,
) -> Result<PartiallySerializedWriteResult> {
let (head, data, checksum) = self.serialize_with_checksum(offset)?;

let res = if let Some(data) = data {
Self::write_double_pass(head, data, file).await
} else {
Self::write_single_pass(head, file).await
};

res.map(|l| PartiallySerializedWriteResult {
bytes_written: l,
header_checksum: checksum,
})
}

fn finalize_with_checksum(
mut buf: BytesMut,
header_len: usize,
blob_offset: u64,
) -> Result<(BytesMut, u32)> {
use std::mem::size_of;

let offset_pos = RecordHeader::blob_offset_offset(header_len);
let checksum_pos = RecordHeader::checksum_offset(header_len);
let offset_slice = &mut buf[offset_pos..(offset_pos + size_of::<u64>())];
offset_slice.copy_from_slice(&blob_offset.to_le_bytes());

let checksum_slice = &mut buf[checksum_pos..(checksum_pos + size_of::<u32>())];
checksum_slice.copy_from_slice(&0u32.to_le_bytes());

let header_slice = &buf[..header_len];
let checksum: u32 = CRC32C.checksum(header_slice);
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
let checksum_slice = &mut buf[checksum_pos..(checksum_pos + size_of::<u32>())];
checksum_slice.copy_from_slice(&checksum.to_le_bytes());

Ok((buf, checksum))
}

async fn write_single_pass(buf: Bytes, file: &File) -> Result<u64> {
let len = buf.len() as u64;
Self::process_file_result(file.write_append_all(buf).await).map(|_| len)
}

async fn write_double_pass(head: Bytes, data: Bytes, file: &File) -> Result<u64> {
let len = (head.len() + data.len()) as u64;
Self::process_file_result(file.write_append_all_buffers(head, data).await).map(|_| len)
}

fn process_file_result(result: std::result::Result<(), std::io::Error>) -> Result<()> {
result.map_err(|e| -> anyhow::Error {
match e.kind() {
kind if kind == IOErrorKind::Other || kind == IOErrorKind::NotFound => {
Error::file_unavailable(kind).into()
}
_ => e.into(),
}
})
}
}
Loading