Skip to content

Commit

Permalink
506 avoid data cloning (#525)
Browse files Browse the repository at this point in the history
* use refs for data where possible and arc elsewhere

* update changelog

* style change

* fix changelog

* remove redundant struct

* add to_vec func

* fix comma

* use bytes for blob data

* fix build error

* update changelog

* update names

* add capacity vec

* remove clone

* remove clone

* fix typo

* use refs for put
  • Loading branch information
idruzhitskiy authored Oct 12, 2022
1 parent 127f468 commit c85c7ab
Show file tree
Hide file tree
Showing 26 changed files with 109 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Bob versions changelog

#### Changed
- Update rust edition to 2021 (#484)
- Remove unnecessary data clone (#506)
- Compare vdiskid first (#594)

#### Fixed
Expand Down
2 changes: 1 addition & 1 deletion bob-apps/bin/bobc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn put(key: Vec<u8>, size: usize, addr: Uri) {
.as_secs();
let meta = BlobMeta { timestamp };
let blob = Blob {
data: vec![1; size],
data: vec![1; size].into(),
meta: Some(meta),
};
let message = PutRequest {
Expand Down
2 changes: 1 addition & 1 deletion bob-apps/bin/bobp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ fn create_blob(task_conf: &TaskConfig) -> Blob {
.as_secs(),
};
Blob {
data: vec![0_u8; task_conf.payload_size as usize],
data: vec![0_u8; task_conf.payload_size as usize].into(),
meta: Some(meta),
}
}
Expand Down
14 changes: 7 additions & 7 deletions bob-backend/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub trait BackendStorage: Debug + MetricsProducer + Send + Sync + 'static {
result
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>;
async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>;
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>;
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>;

async fn get(&self, op: Operation, key: BobKey) -> Result<BobData, Error>;
async fn get_alien(&self, op: Operation, key: BobKey) -> Result<BobData, Error>;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Backend {
self.inner.run().await
}

pub async fn put(&self, key: BobKey, data: BobData, options: BobOptions) -> Result<(), Error> {
pub async fn put(&self, key: BobKey, data: &BobData, options: BobOptions) -> Result<(), Error> {
trace!(">>>>>>- - - - - BACKEND PUT START - - - - -");
let sw = Stopwatch::start_new();
let (vdisk_id, disk_path) = self.mapper.get_operation(key);
Expand All @@ -254,7 +254,7 @@ impl Backend {
op.set_remote_folder(node_name.to_owned());

//TODO make it parallel?
self.put_single(key, data.clone(), op).await?;
self.put_single(key, data, op).await?;
}
Ok(())
} else if let Some(path) = disk_path {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Backend {
pub async fn put_local(
&self,
key: BobKey,
data: BobData,
data: &BobData,
operation: Operation,
) -> Result<(), Error> {
self.put_single(key, data, operation).await
Expand All @@ -297,15 +297,15 @@ impl Backend {
async fn put_single(
&self,
key: BobKey,
data: BobData,
data: &BobData,
operation: Operation,
) -> Result<(), Error> {
if operation.is_data_alien() {
debug!("PUT[{}] to backend, alien data: {:?}", key, operation);
self.inner.put_alien(operation, key, data).await
} else {
debug!("PUT[{}] to backend: {:?}", key, operation);
let result = self.inner.put(operation.clone(), key, data.clone()).await;
let result = self.inner.put(operation.clone(), key, data).await;
match result {
Err(local_err) if !local_err.is_duplicate() => {
debug!(
Expand Down
10 changes: 5 additions & 5 deletions bob-backend/src/mem_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub struct VDisk {
}

impl VDisk {
async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!("PUT[{}] to vdisk", key);
self.inner.write().await.insert(key, data);
self.inner.write().await.insert(key, data.clone());
Ok(())
}

Expand Down Expand Up @@ -73,7 +73,7 @@ impl MemDisk {
}
}

pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: BobData) -> Result<(), Error> {
pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: &BobData) -> Result<(), Error> {
if let Some(vdisk) = self.vdisks.get(&vdisk_id) {
debug!("PUT[{}] to vdisk: {} for: {}", key, vdisk_id, self.name);
vdisk.put(key, data).await
Expand Down Expand Up @@ -135,7 +135,7 @@ impl BackendStorage for MemBackend {
Ok(())
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
let disk_name = op.disk_name_local();
debug!("PUT[{}][{}] to backend", key, disk_name);
let disk = self.disks.get(&disk_name);
Expand All @@ -147,7 +147,7 @@ impl BackendStorage for MemBackend {
}
}

async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!("PUT[{}] to backend, foreign data", key);
self.foreign_data.put(op.vdisk_id(), key, data).await
}
Expand Down
6 changes: 3 additions & 3 deletions bob-backend/src/mem_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn test_mem_put_wrong_disk() {
.put(
Operation::new_local(0, DiskPath::new("invalid name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![0], BobMeta::stub()),
&BobData::new(vec![0].into(), BobMeta::stub()),
)
.await;
assert!(retval.err().unwrap().is_internal())
Expand All @@ -40,7 +40,7 @@ async fn test_mem_put_get() {
.put(
Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![1], BobMeta::stub()),
&BobData::new(vec![1].into(), BobMeta::stub()),
)
.await
.unwrap();
Expand All @@ -62,7 +62,7 @@ async fn test_mem_get_wrong_disk() {
.put(
Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![1], BobMeta::stub()),
&BobData::new(vec![1].into(), BobMeta::stub()),
)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl BackendStorage for Pearl {
Ok(())
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
debug!("PUT[{}] to pearl backend. operation: {:?}", key, op);
let dc_option = self
.disk_controllers
Expand All @@ -168,7 +168,7 @@ impl BackendStorage for Pearl {
}
}

async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
debug!("PUT[alien][{}] to pearl backend, operation: {:?}", key, op);
self.alien_disk_controller.put_alien(op, key, data).await
}
Expand Down
34 changes: 0 additions & 34 deletions bob-backend/src/pearl/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,3 @@ impl Ord for Key {
self.partial_cmp(&other).unwrap()
}
}

pub struct Data {
data: Vec<u8>,
timestamp: u64,
}

impl Data {
const TIMESTAMP_LEN: usize = 8;

pub fn to_vec(&self) -> Vec<u8> {
let mut result = self.timestamp.to_be_bytes().to_vec();
result.extend_from_slice(&self.data);
result
}

pub fn from_bytes(data: &[u8]) -> Result<BobData, Error> {
let (ts, bob_data) = data.split_at(Self::TIMESTAMP_LEN);
let bytes = ts
.try_into()
.map_err(|e| Error::storage(format!("parse error: {}", e)))?;
let timestamp = u64::from_be_bytes(bytes);
let meta = BobMeta::new(timestamp);
Ok(BobData::new(bob_data.to_vec(), meta))
}
}

impl From<BobData> for Data {
fn from(data: BobData) -> Self {
Self {
timestamp: data.meta().timestamp(),
data: data.into_inner(),
}
}
}
6 changes: 3 additions & 3 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ impl DiskController {
&self,
op: Operation,
key: BobKey,
data: BobData,
data: &BobData,
) -> Result<(), Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.get_or_create_pearl(&op).await;
match vdisk_group {
Ok(group) => match group
.put(key, data.clone(), StartTimestampConfig::new(false))
.put(key, data, StartTimestampConfig::new(false))
.await
{
Err(e) => Err(self.process_error(e).await),
Expand All @@ -402,7 +402,7 @@ impl DiskController {
}
}

pub(crate) async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
pub(crate) async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = {
let groups = self.groups.read().await;
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Group {
pub async fn put(
&self,
key: BobKey,
data: BobData,
data: &BobData,
timestamp_config: StartTimestampConfig,
) -> Result<(), Error> {
let holder = self.get_actual_holder(&data, timestamp_config).await?;
Expand All @@ -232,7 +232,7 @@ impl Group {
Ok(res)
}

async fn put_common(holder: &Holder, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put_common(holder: &Holder, key: BobKey, data: &BobData) -> Result<(), Error> {
let result = holder.write(key, data).await;
if let Err(e) = result {
// if we receive WorkDirUnavailable it's likely disk error, so we shouldn't restart one
Expand Down
12 changes: 6 additions & 6 deletions bob-backend/src/pearl/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{pearl::utils::get_current_timestamp, prelude::*};

use super::{
core::{BackendResult, PearlStorage},
data::{Data, Key},
data::Key,
utils::Utils,
};
use bob_common::metrics::pearl::{
Expand Down Expand Up @@ -191,14 +191,14 @@ impl Holder {
);
}

pub async fn write(&self, key: BobKey, data: BobData) -> BackendResult<()> {
pub async fn write(&self, key: BobKey, data: &BobData) -> BackendResult<()> {
let state = self.storage.read().await;

if state.is_ready() {
let storage = state.get();
self.update_last_modification();
trace!("Vdisk: {}, write key: {}", self.vdisk, key);
Self::write_disk(storage, Key::from(key), data.clone()).await
Self::write_disk(storage, Key::from(key), data).await
} else {
trace!("Vdisk: {} isn't ready for writing: {:?}", self.vdisk, state);
Err(Error::vdisk_is_not_ready())
Expand All @@ -213,11 +213,11 @@ impl Holder {

// @TODO remove redundant return result
#[allow(clippy::cast_possible_truncation)]
async fn write_disk(storage: PearlStorage, key: Key, data: BobData) -> BackendResult<()> {
async fn write_disk(storage: PearlStorage, key: Key, data: &BobData) -> BackendResult<()> {
counter!(PEARL_PUT_COUNTER, 1);
let data_size = Self::calc_data_size(&data);
let timer = Instant::now();
let res = storage.write(key, Data::from(data).to_vec()).await;
let res = storage.write(key, data.to_serialized_vec()).await;
let res = match res {
Err(e) => {
counter!(PEARL_PUT_ERROR_COUNTER, 1);
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Holder {
.await
.map(|r| {
counter!(PEARL_GET_BYTES_COUNTER, r.len() as u64);
Data::from_bytes(&r)
BobData::from_serialized_bytes(r)
})
.map_err(|e| {
counter!(PEARL_GET_ERROR_COUNTER, 1);
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ async fn test_write_multiple_read() {
backend.run().await.unwrap();
let path = DiskPath::new(DISK_NAME.to_owned(), "".to_owned());
let operation = Operation::new_local(vdisk_id, path);
let data = BobData::new(vec![], BobMeta::new(TIMESTAMP));
let data = BobData::new(vec![].into(), BobMeta::new(TIMESTAMP));
let write = backend
.put(operation.clone(), BobKey::from(KEY_ID), data)
.put(operation.clone(), BobKey::from(KEY_ID), &data)
.await;
assert!(write.is_ok());

Expand Down
6 changes: 3 additions & 3 deletions bob-backend/src/stub_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl BackendStorage for StubBackend {
Ok(())
}

async fn put(&self, _operation: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, _operation: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!(
"PUT[{}]: hi from backend, timestamp: {:?}",
key,
Expand All @@ -27,7 +27,7 @@ impl BackendStorage for StubBackend {
&self,
_operation: Operation,
key: BobKey,
data: BobData,
data: &BobData,
) -> Result<(), Error> {
debug!(
"PUT[{}]: hi from backend, timestamp: {:?}",
Expand All @@ -39,7 +39,7 @@ impl BackendStorage for StubBackend {

async fn get(&self, _operation: Operation, key: BobKey) -> Result<BobData, Error> {
debug!("GET[{}]: hi from backend", key);
Ok(BobData::new(vec![0], BobMeta::stub()))
Ok(BobData::new(vec![0].into(), BobMeta::stub()))
}

async fn get_alien(&self, operation: Operation, key: BobKey) -> Result<BobData, Error> {
Expand Down
28 changes: 25 additions & 3 deletions bob-common/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::{
error::Error,
mapper::NodesMap,
node::{Disk as NodeDisk, Node},
};
use bob_grpc::{GetOptions, GetSource, PutOptions};
use bytes::Bytes;
use std::{
convert::TryInto,
fmt::{Debug, Formatter, Result as FmtResult},
Expand Down Expand Up @@ -92,26 +94,46 @@ pub type VDiskId = u32;

#[derive(Clone)]
pub struct BobData {
inner: Vec<u8>,
inner: Bytes,
meta: BobMeta,
}

impl BobData {
pub fn new(inner: Vec<u8>, meta: BobMeta) -> Self {
const TIMESTAMP_LEN: usize = 8;

pub fn new(inner: Bytes, meta: BobMeta) -> Self {
BobData { inner, meta }
}

pub fn inner(&self) -> &[u8] {
&self.inner
}

pub fn into_inner(self) -> Vec<u8> {
pub fn into_inner(self) -> Bytes {
self.inner
}

pub fn meta(&self) -> &BobMeta {
&self.meta
}

pub fn to_serialized_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(Self::TIMESTAMP_LEN + self.inner.len());
result.extend_from_slice(&self.meta.timestamp.to_be_bytes());
result.extend_from_slice(&self.inner);
result
}

pub fn from_serialized_bytes(data: Vec<u8>) -> Result<BobData, Error> {
let mut bob_data = Bytes::from(data);
let ts_bytes = bob_data.split_to(Self::TIMESTAMP_LEN);
let ts_bytes = (&*ts_bytes)
.try_into()
.map_err(|e| Error::storage(format!("parse error: {}", e)))?;
let timestamp = u64::from_be_bytes(ts_bytes);
let meta = BobMeta::new(timestamp);
Ok(BobData::new(bob_data, meta))
}
}

impl Debug for BobData {
Expand Down
Loading

0 comments on commit c85c7ab

Please sign in to comment.