Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

506 avoid data cloning #525

Merged
merged 22 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Bob versions changelog
- Update rust edition to 2021 (#484)

#### Fixed
- Remove unnecessary data clone (#506)
idruzhitskiy marked this conversation as resolved.
Show resolved Hide resolved
- Print full error text received from Pearl in exist function (#581)
- Fix alien indexes offloading (#560)
- Internode auth works properly with nodes on same ip (#548)
Expand Down
2 changes: 1 addition & 1 deletion bob-apps/bin/bobc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn put(key: Vec<u8>, size: usize, addr: Uri) {
.as_secs();
let meta = BlobMeta { timestamp };
let blob = Blob {
data: vec![1; size],
data: vec![1; size].into(),
meta: Some(meta),
};
let message = PutRequest {
Expand Down
2 changes: 1 addition & 1 deletion bob-apps/bin/bobp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ fn create_blob(task_conf: &TaskConfig) -> Blob {
.as_secs(),
};
Blob {
data: vec![0_u8; task_conf.payload_size as usize],
data: vec![0_u8; task_conf.payload_size as usize].into(),
meta: Some(meta),
}
}
Expand Down
14 changes: 7 additions & 7 deletions bob-backend/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub trait BackendStorage: Debug + MetricsProducer + Send + Sync + 'static {
result
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>;
async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>;
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>;
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>;

async fn get(&self, op: Operation, key: BobKey) -> Result<BobData, Error>;
async fn get_alien(&self, op: Operation, key: BobKey) -> Result<BobData, Error>;
Expand Down Expand Up @@ -237,7 +237,7 @@ impl Backend {
self.inner.run().await
}

pub async fn put(&self, key: BobKey, data: BobData, options: BobOptions) -> Result<(), Error> {
pub async fn put(&self, key: BobKey, data: &BobData, options: BobOptions) -> Result<(), Error> {
trace!(">>>>>>- - - - - BACKEND PUT START - - - - -");
let sw = Stopwatch::start_new();
let (vdisk_id, disk_path) = self.mapper.get_operation(key);
Expand All @@ -254,7 +254,7 @@ impl Backend {
op.set_remote_folder(node_name.to_owned());

//TODO make it parallel?
self.put_single(key, data.clone(), op).await?;
self.put_single(key, data, op).await?;
}
Ok(())
} else if let Some(path) = disk_path {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Backend {
pub async fn put_local(
&self,
key: BobKey,
data: BobData,
data: &BobData,
operation: Operation,
) -> Result<(), Error> {
self.put_single(key, data, operation).await
Expand All @@ -297,15 +297,15 @@ impl Backend {
async fn put_single(
&self,
key: BobKey,
data: BobData,
data: &BobData,
operation: Operation,
) -> Result<(), Error> {
if operation.is_data_alien() {
debug!("PUT[{}] to backend, alien data: {:?}", key, operation);
self.inner.put_alien(operation, key, data).await
} else {
debug!("PUT[{}] to backend: {:?}", key, operation);
let result = self.inner.put(operation.clone(), key, data.clone()).await;
let result = self.inner.put(operation.clone(), key, data).await;
match result {
Err(local_err) if !local_err.is_duplicate() => {
debug!(
Expand Down
10 changes: 5 additions & 5 deletions bob-backend/src/mem_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ pub struct VDisk {
}

impl VDisk {
async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!("PUT[{}] to vdisk", key);
self.inner.write().await.insert(key, data);
self.inner.write().await.insert(key, data.clone());
Ok(())
}

Expand Down Expand Up @@ -73,7 +73,7 @@ impl MemDisk {
}
}

pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: BobData) -> Result<(), Error> {
pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: &BobData) -> Result<(), Error> {
if let Some(vdisk) = self.vdisks.get(&vdisk_id) {
debug!("PUT[{}] to vdisk: {} for: {}", key, vdisk_id, self.name);
vdisk.put(key, data).await
Expand Down Expand Up @@ -135,7 +135,7 @@ impl BackendStorage for MemBackend {
Ok(())
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
let disk_name = op.disk_name_local();
debug!("PUT[{}][{}] to backend", key, disk_name);
let disk = self.disks.get(&disk_name);
Expand All @@ -147,7 +147,7 @@ impl BackendStorage for MemBackend {
}
}

async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!("PUT[{}] to backend, foreign data", key);
self.foreign_data.put(op.vdisk_id(), key, data).await
}
Expand Down
6 changes: 3 additions & 3 deletions bob-backend/src/mem_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn test_mem_put_wrong_disk() {
.put(
Operation::new_local(0, DiskPath::new("invalid name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![0], BobMeta::stub()),
&BobData::new(vec![0].into(), BobMeta::stub()),
)
.await;
assert!(retval.err().unwrap().is_internal())
Expand All @@ -40,7 +40,7 @@ async fn test_mem_put_get() {
.put(
Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![1], BobMeta::stub()),
&BobData::new(vec![1].into(), BobMeta::stub()),
)
.await
.unwrap();
Expand All @@ -62,7 +62,7 @@ async fn test_mem_get_wrong_disk() {
.put(
Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())),
BobKey::from(1u64),
BobData::new(vec![1], BobMeta::stub()),
&BobData::new(vec![1].into(), BobMeta::stub()),
)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl BackendStorage for Pearl {
Ok(())
}

async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
debug!("PUT[{}] to pearl backend. operation: {:?}", key, op);
let dc_option = self
.disk_controllers
Expand All @@ -168,7 +168,7 @@ impl BackendStorage for Pearl {
}
}

async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
debug!("PUT[alien][{}] to pearl backend, operation: {:?}", key, op);
self.alien_disk_controller.put_alien(op, key, data).await
}
Expand Down
34 changes: 0 additions & 34 deletions bob-backend/src/pearl/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,3 @@ impl Ord for Key {
self.partial_cmp(&other).unwrap()
}
}

pub struct Data {
data: Vec<u8>,
timestamp: u64,
}

impl Data {
const TIMESTAMP_LEN: usize = 8;

pub fn to_vec(&self) -> Vec<u8> {
let mut result = self.timestamp.to_be_bytes().to_vec();
result.extend_from_slice(&self.data);
result
}

pub fn from_bytes(data: &[u8]) -> Result<BobData, Error> {
let (ts, bob_data) = data.split_at(Self::TIMESTAMP_LEN);
let bytes = ts
.try_into()
.map_err(|e| Error::storage(format!("parse error: {}", e)))?;
let timestamp = u64::from_be_bytes(bytes);
let meta = BobMeta::new(timestamp);
Ok(BobData::new(bob_data.to_vec(), meta))
}
}

impl From<BobData> for Data {
fn from(data: BobData) -> Self {
Self {
timestamp: data.meta().timestamp(),
data: data.into_inner(),
}
}
}
6 changes: 3 additions & 3 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ impl DiskController {
&self,
op: Operation,
key: BobKey,
data: BobData,
data: &BobData,
) -> Result<(), Error> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = self.get_or_create_pearl(&op).await;
match vdisk_group {
Ok(group) => match group
.put(key, data.clone(), StartTimestampConfig::new(false))
.put(key, data, StartTimestampConfig::new(false))
.await
{
Err(e) => Err(self.process_error(e).await),
Expand All @@ -402,7 +402,7 @@ impl DiskController {
}
}

pub(crate) async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> {
pub(crate) async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> {
if *self.state.read().await == GroupsState::Ready {
let vdisk_group = {
let groups = self.groups.read().await;
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Group {
pub async fn put(
&self,
key: BobKey,
data: BobData,
data: &BobData,
timestamp_config: StartTimestampConfig,
) -> Result<(), Error> {
let holder = self.get_actual_holder(&data, timestamp_config).await?;
Expand All @@ -232,7 +232,7 @@ impl Group {
Ok(res)
}

async fn put_common(holder: &Holder, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put_common(holder: &Holder, key: BobKey, data: &BobData) -> Result<(), Error> {
let result = holder.write(key, data).await;
if let Err(e) = result {
// if we receive WorkDirUnavailable it's likely disk error, so we shouldn't restart one
Expand Down
12 changes: 6 additions & 6 deletions bob-backend/src/pearl/holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{pearl::utils::get_current_timestamp, prelude::*};

use super::{
core::{BackendResult, PearlStorage},
data::{Data, Key},
data::Key,
utils::Utils,
};
use bob_common::metrics::pearl::{
Expand Down Expand Up @@ -191,14 +191,14 @@ impl Holder {
);
}

pub async fn write(&self, key: BobKey, data: BobData) -> BackendResult<()> {
pub async fn write(&self, key: BobKey, data: &BobData) -> BackendResult<()> {
let state = self.storage.read().await;

if state.is_ready() {
let storage = state.get();
self.update_last_modification();
trace!("Vdisk: {}, write key: {}", self.vdisk, key);
Self::write_disk(storage, Key::from(key), data.clone()).await
Self::write_disk(storage, Key::from(key), data).await
} else {
trace!("Vdisk: {} isn't ready for writing: {:?}", self.vdisk, state);
Err(Error::vdisk_is_not_ready())
Expand All @@ -213,11 +213,11 @@ impl Holder {

// @TODO remove redundant return result
#[allow(clippy::cast_possible_truncation)]
async fn write_disk(storage: PearlStorage, key: Key, data: BobData) -> BackendResult<()> {
async fn write_disk(storage: PearlStorage, key: Key, data: &BobData) -> BackendResult<()> {
counter!(PEARL_PUT_COUNTER, 1);
let data_size = Self::calc_data_size(&data);
let timer = Instant::now();
let res = storage.write(key, Data::from(data).to_vec()).await;
let res = storage.write(key, data.to_vec()).await;
let res = match res {
Err(e) => {
counter!(PEARL_PUT_ERROR_COUNTER, 1);
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Holder {
.await
.map(|r| {
counter!(PEARL_GET_BYTES_COUNTER, r.len() as u64);
Data::from_bytes(&r)
BobData::from_bytes(&r)
})
.map_err(|e| {
counter!(PEARL_GET_ERROR_COUNTER, 1);
Expand Down
4 changes: 2 additions & 2 deletions bob-backend/src/pearl/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ async fn test_write_multiple_read() {
backend.run().await.unwrap();
let path = DiskPath::new(DISK_NAME.to_owned(), "".to_owned());
let operation = Operation::new_local(vdisk_id, path);
let data = BobData::new(vec![], BobMeta::new(TIMESTAMP));
let data = BobData::new(vec![].into(), BobMeta::new(TIMESTAMP));
let write = backend
.put(operation.clone(), BobKey::from(KEY_ID), data)
.put(operation.clone(), BobKey::from(KEY_ID), &data)
.await;
assert!(write.is_ok());

Expand Down
6 changes: 3 additions & 3 deletions bob-backend/src/stub_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl BackendStorage for StubBackend {
Ok(())
}

async fn put(&self, _operation: Operation, key: BobKey, data: BobData) -> Result<(), Error> {
async fn put(&self, _operation: Operation, key: BobKey, data: &BobData) -> Result<(), Error> {
debug!(
"PUT[{}]: hi from backend, timestamp: {:?}",
key,
Expand All @@ -27,7 +27,7 @@ impl BackendStorage for StubBackend {
&self,
_operation: Operation,
key: BobKey,
data: BobData,
data: &BobData,
) -> Result<(), Error> {
debug!(
"PUT[{}]: hi from backend, timestamp: {:?}",
Expand All @@ -39,7 +39,7 @@ impl BackendStorage for StubBackend {

async fn get(&self, _operation: Operation, key: BobKey) -> Result<BobData, Error> {
debug!("GET[{}]: hi from backend", key);
Ok(BobData::new(vec![0], BobMeta::stub()))
Ok(BobData::new(vec![0].into(), BobMeta::stub()))
}

async fn get_alien(&self, operation: Operation, key: BobKey) -> Result<BobData, Error> {
Expand Down
2 changes: 1 addition & 1 deletion bob-common/src/bob_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub mod b_client {
Err(e) => {
self.metrics.put_error_count();
self.metrics.put_timer_stop(timer);
Err(NodeOutput::new(node_name, e.into()))
Err(NodeOutput::new(node_name, e.into()))
},
}
}
Expand Down
Loading