From c85c7ab00f7671351ded30b406eb9edbc32ffff8 Mon Sep 17 00:00:00 2001 From: idruzhitskiy Date: Wed, 12 Oct 2022 21:14:31 +0300 Subject: [PATCH] 506 avoid data cloning (#525) * use refs for data where possible and arc elsewhere * update changelog * style change * fix changelog * remove redundant struct * add to_vec func * fix comma * use bytes for blob data * fix build error * update changelog * update names * add capacity vec * remove clone * remove clone * fix typo * use refs for put --- CHANGELOG.md | 1 + bob-apps/bin/bobc.rs | 2 +- bob-apps/bin/bobp.rs | 2 +- bob-backend/src/core.rs | 14 +++++----- bob-backend/src/mem_backend.rs | 10 +++---- bob-backend/src/mem_tests.rs | 6 ++--- bob-backend/src/pearl/core.rs | 4 +-- bob-backend/src/pearl/data.rs | 34 ------------------------ bob-backend/src/pearl/disk_controller.rs | 6 ++--- bob-backend/src/pearl/group.rs | 4 +-- bob-backend/src/pearl/holder.rs | 12 ++++----- bob-backend/src/pearl/tests.rs | 4 +-- bob-backend/src/stub_backend.rs | 6 ++--- bob-common/src/data.rs | 28 ++++++++++++++++--- bob-grpc/Cargo.toml | 3 ++- bob-grpc/build.rs | 6 ++++- bob/src/api/mod.rs | 5 ++-- bob/src/api/s3.rs | 7 +++-- bob/src/cluster/mod.rs | 2 +- bob/src/cluster/operations.rs | 10 +++---- bob/src/cluster/quorum.rs | 25 +++++++++-------- bob/src/cluster/simple.rs | 2 +- bob/src/cluster/tests.rs | 20 +++++++------- bob/src/grinder.rs | 2 +- bob/src/lib.rs | 2 +- bob/src/server.rs | 5 ++-- 26 files changed, 109 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c777e04a6..6940775a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Bob versions changelog #### Changed - Update rust edition to 2021 (#484) +- Remove unnecessary data clone (#506) - Compare vdiskid first (#594) #### Fixed diff --git a/bob-apps/bin/bobc.rs b/bob-apps/bin/bobc.rs index 2bd82c6d7..c9522b479 100644 --- a/bob-apps/bin/bobc.rs +++ b/bob-apps/bin/bobc.rs @@ -51,7 +51,7 @@ async fn put(key: Vec, size: usize, addr: Uri) { .as_secs(); let meta = BlobMeta { timestamp }; let blob = Blob { - data: vec![1; size], + data: vec![1; size].into(), meta: Some(meta), }; let message = PutRequest { diff --git a/bob-apps/bin/bobp.rs b/bob-apps/bin/bobp.rs index dea1253cc..2c39934a8 100644 --- a/bob-apps/bin/bobp.rs +++ b/bob-apps/bin/bobp.rs @@ -956,7 +956,7 @@ fn create_blob(task_conf: &TaskConfig) -> Blob { .as_secs(), }; Blob { - data: vec![0_u8; task_conf.payload_size as usize], + data: vec![0_u8; task_conf.payload_size as usize].into(), meta: Some(meta), } } diff --git a/bob-backend/src/core.rs b/bob-backend/src/core.rs index c0379ce52..81ddf4c1f 100644 --- a/bob-backend/src/core.rs +++ b/bob-backend/src/core.rs @@ -98,8 +98,8 @@ pub trait BackendStorage: Debug + MetricsProducer + Send + Sync + 'static { result } - async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>; - async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error>; + async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>; + async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error>; async fn get(&self, op: Operation, key: BobKey) -> Result; async fn get_alien(&self, op: Operation, key: BobKey) -> Result; @@ -237,7 +237,7 @@ impl Backend { self.inner.run().await } - pub async fn put(&self, key: BobKey, data: BobData, options: BobOptions) -> Result<(), Error> { + pub async fn put(&self, key: BobKey, data: &BobData, options: BobOptions) -> Result<(), Error> { trace!(">>>>>>- - - - - BACKEND PUT START - - - - -"); let sw = Stopwatch::start_new(); let (vdisk_id, disk_path) = self.mapper.get_operation(key); @@ -254,7 +254,7 @@ impl Backend { op.set_remote_folder(node_name.to_owned()); //TODO make it parallel? - self.put_single(key, data.clone(), op).await?; + self.put_single(key, data, op).await?; } Ok(()) } else if let Some(path) = disk_path { @@ -283,7 +283,7 @@ impl Backend { pub async fn put_local( &self, key: BobKey, - data: BobData, + data: &BobData, operation: Operation, ) -> Result<(), Error> { self.put_single(key, data, operation).await @@ -297,7 +297,7 @@ impl Backend { async fn put_single( &self, key: BobKey, - data: BobData, + data: &BobData, operation: Operation, ) -> Result<(), Error> { if operation.is_data_alien() { @@ -305,7 +305,7 @@ impl Backend { self.inner.put_alien(operation, key, data).await } else { debug!("PUT[{}] to backend: {:?}", key, operation); - let result = self.inner.put(operation.clone(), key, data.clone()).await; + let result = self.inner.put(operation.clone(), key, data).await; match result { Err(local_err) if !local_err.is_duplicate() => { debug!( diff --git a/bob-backend/src/mem_backend.rs b/bob-backend/src/mem_backend.rs index 82a8feeb7..25e393fa1 100755 --- a/bob-backend/src/mem_backend.rs +++ b/bob-backend/src/mem_backend.rs @@ -8,9 +8,9 @@ pub struct VDisk { } impl VDisk { - async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error> { debug!("PUT[{}] to vdisk", key); - self.inner.write().await.insert(key, data); + self.inner.write().await.insert(key, data.clone()); Ok(()) } @@ -73,7 +73,7 @@ impl MemDisk { } } - pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: BobData) -> Result<(), Error> { + pub async fn put(&self, vdisk_id: VDiskId, key: BobKey, data: &BobData) -> Result<(), Error> { if let Some(vdisk) = self.vdisks.get(&vdisk_id) { debug!("PUT[{}] to vdisk: {} for: {}", key, vdisk_id, self.name); vdisk.put(key, data).await @@ -135,7 +135,7 @@ impl BackendStorage for MemBackend { Ok(()) } - async fn put(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> { let disk_name = op.disk_name_local(); debug!("PUT[{}][{}] to backend", key, disk_name); let disk = self.disks.get(&disk_name); @@ -147,7 +147,7 @@ impl BackendStorage for MemBackend { } } - async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> Result<(), Error> { debug!("PUT[{}] to backend, foreign data", key); self.foreign_data.put(op.vdisk_id(), key, data).await } diff --git a/bob-backend/src/mem_tests.rs b/bob-backend/src/mem_tests.rs index 69bfcf736..ffb049ee6 100755 --- a/bob-backend/src/mem_tests.rs +++ b/bob-backend/src/mem_tests.rs @@ -26,7 +26,7 @@ async fn test_mem_put_wrong_disk() { .put( Operation::new_local(0, DiskPath::new("invalid name".to_owned(), "".to_owned())), BobKey::from(1u64), - BobData::new(vec![0], BobMeta::stub()), + &BobData::new(vec![0].into(), BobMeta::stub()), ) .await; assert!(retval.err().unwrap().is_internal()) @@ -40,7 +40,7 @@ async fn test_mem_put_get() { .put( Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())), BobKey::from(1u64), - BobData::new(vec![1], BobMeta::stub()), + &BobData::new(vec![1].into(), BobMeta::stub()), ) .await .unwrap(); @@ -62,7 +62,7 @@ async fn test_mem_get_wrong_disk() { .put( Operation::new_local(0, DiskPath::new("name".to_owned(), "".to_owned())), BobKey::from(1u64), - BobData::new(vec![1], BobMeta::stub()), + &BobData::new(vec![1].into(), BobMeta::stub()), ) .await .unwrap(); diff --git a/bob-backend/src/pearl/core.rs b/bob-backend/src/pearl/core.rs index a3eeb5af2..4abda13e0 100755 --- a/bob-backend/src/pearl/core.rs +++ b/bob-backend/src/pearl/core.rs @@ -148,7 +148,7 @@ impl BackendStorage for Pearl { Ok(()) } - async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> { + async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> { debug!("PUT[{}] to pearl backend. operation: {:?}", key, op); let dc_option = self .disk_controllers @@ -168,7 +168,7 @@ impl BackendStorage for Pearl { } } - async fn put_alien(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> { + async fn put_alien(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> { debug!("PUT[alien][{}] to pearl backend, operation: {:?}", key, op); self.alien_disk_controller.put_alien(op, key, data).await } diff --git a/bob-backend/src/pearl/data.rs b/bob-backend/src/pearl/data.rs index 383993731..0682d08de 100755 --- a/bob-backend/src/pearl/data.rs +++ b/bob-backend/src/pearl/data.rs @@ -108,37 +108,3 @@ impl Ord for Key { self.partial_cmp(&other).unwrap() } } - -pub struct Data { - data: Vec, - timestamp: u64, -} - -impl Data { - const TIMESTAMP_LEN: usize = 8; - - pub fn to_vec(&self) -> Vec { - let mut result = self.timestamp.to_be_bytes().to_vec(); - result.extend_from_slice(&self.data); - result - } - - pub fn from_bytes(data: &[u8]) -> Result { - let (ts, bob_data) = data.split_at(Self::TIMESTAMP_LEN); - let bytes = ts - .try_into() - .map_err(|e| Error::storage(format!("parse error: {}", e)))?; - let timestamp = u64::from_be_bytes(bytes); - let meta = BobMeta::new(timestamp); - Ok(BobData::new(bob_data.to_vec(), meta)) - } -} - -impl From for Data { - fn from(data: BobData) -> Self { - Self { - timestamp: data.meta().timestamp(), - data: data.into_inner(), - } - } -} diff --git a/bob-backend/src/pearl/disk_controller.rs b/bob-backend/src/pearl/disk_controller.rs index c95e56db6..f38f2767c 100644 --- a/bob-backend/src/pearl/disk_controller.rs +++ b/bob-backend/src/pearl/disk_controller.rs @@ -377,13 +377,13 @@ impl DiskController { &self, op: Operation, key: BobKey, - data: BobData, + data: &BobData, ) -> Result<(), Error> { if *self.state.read().await == GroupsState::Ready { let vdisk_group = self.get_or_create_pearl(&op).await; match vdisk_group { Ok(group) => match group - .put(key, data.clone(), StartTimestampConfig::new(false)) + .put(key, data, StartTimestampConfig::new(false)) .await { Err(e) => Err(self.process_error(e).await), @@ -402,7 +402,7 @@ impl DiskController { } } - pub(crate) async fn put(&self, op: Operation, key: BobKey, data: BobData) -> BackendResult<()> { + pub(crate) async fn put(&self, op: Operation, key: BobKey, data: &BobData) -> BackendResult<()> { if *self.state.read().await == GroupsState::Ready { let vdisk_group = { let groups = self.groups.read().await; diff --git a/bob-backend/src/pearl/group.rs b/bob-backend/src/pearl/group.rs index 6e4a1039b..e8faa7381 100644 --- a/bob-backend/src/pearl/group.rs +++ b/bob-backend/src/pearl/group.rs @@ -220,7 +220,7 @@ impl Group { pub async fn put( &self, key: BobKey, - data: BobData, + data: &BobData, timestamp_config: StartTimestampConfig, ) -> Result<(), Error> { let holder = self.get_actual_holder(&data, timestamp_config).await?; @@ -232,7 +232,7 @@ impl Group { Ok(res) } - async fn put_common(holder: &Holder, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put_common(holder: &Holder, key: BobKey, data: &BobData) -> Result<(), Error> { let result = holder.write(key, data).await; if let Err(e) = result { // if we receive WorkDirUnavailable it's likely disk error, so we shouldn't restart one diff --git a/bob-backend/src/pearl/holder.rs b/bob-backend/src/pearl/holder.rs index 344bd78b1..d75cdee94 100644 --- a/bob-backend/src/pearl/holder.rs +++ b/bob-backend/src/pearl/holder.rs @@ -4,7 +4,7 @@ use crate::{pearl::utils::get_current_timestamp, prelude::*}; use super::{ core::{BackendResult, PearlStorage}, - data::{Data, Key}, + data::Key, utils::Utils, }; use bob_common::metrics::pearl::{ @@ -191,14 +191,14 @@ impl Holder { ); } - pub async fn write(&self, key: BobKey, data: BobData) -> BackendResult<()> { + pub async fn write(&self, key: BobKey, data: &BobData) -> BackendResult<()> { let state = self.storage.read().await; if state.is_ready() { let storage = state.get(); self.update_last_modification(); trace!("Vdisk: {}, write key: {}", self.vdisk, key); - Self::write_disk(storage, Key::from(key), data.clone()).await + Self::write_disk(storage, Key::from(key), data).await } else { trace!("Vdisk: {} isn't ready for writing: {:?}", self.vdisk, state); Err(Error::vdisk_is_not_ready()) @@ -213,11 +213,11 @@ impl Holder { // @TODO remove redundant return result #[allow(clippy::cast_possible_truncation)] - async fn write_disk(storage: PearlStorage, key: Key, data: BobData) -> BackendResult<()> { + async fn write_disk(storage: PearlStorage, key: Key, data: &BobData) -> BackendResult<()> { counter!(PEARL_PUT_COUNTER, 1); let data_size = Self::calc_data_size(&data); let timer = Instant::now(); - let res = storage.write(key, Data::from(data).to_vec()).await; + let res = storage.write(key, data.to_serialized_vec()).await; let res = match res { Err(e) => { counter!(PEARL_PUT_ERROR_COUNTER, 1); @@ -258,7 +258,7 @@ impl Holder { .await .map(|r| { counter!(PEARL_GET_BYTES_COUNTER, r.len() as u64); - Data::from_bytes(&r) + BobData::from_serialized_bytes(r) }) .map_err(|e| { counter!(PEARL_GET_ERROR_COUNTER, 1); diff --git a/bob-backend/src/pearl/tests.rs b/bob-backend/src/pearl/tests.rs index 6a988c61b..90faa42ee 100755 --- a/bob-backend/src/pearl/tests.rs +++ b/bob-backend/src/pearl/tests.rs @@ -74,9 +74,9 @@ async fn test_write_multiple_read() { backend.run().await.unwrap(); let path = DiskPath::new(DISK_NAME.to_owned(), "".to_owned()); let operation = Operation::new_local(vdisk_id, path); - let data = BobData::new(vec![], BobMeta::new(TIMESTAMP)); + let data = BobData::new(vec![].into(), BobMeta::new(TIMESTAMP)); let write = backend - .put(operation.clone(), BobKey::from(KEY_ID), data) + .put(operation.clone(), BobKey::from(KEY_ID), &data) .await; assert!(write.is_ok()); diff --git a/bob-backend/src/stub_backend.rs b/bob-backend/src/stub_backend.rs index a13a6c147..9e972a3f6 100755 --- a/bob-backend/src/stub_backend.rs +++ b/bob-backend/src/stub_backend.rs @@ -14,7 +14,7 @@ impl BackendStorage for StubBackend { Ok(()) } - async fn put(&self, _operation: Operation, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put(&self, _operation: Operation, key: BobKey, data: &BobData) -> Result<(), Error> { debug!( "PUT[{}]: hi from backend, timestamp: {:?}", key, @@ -27,7 +27,7 @@ impl BackendStorage for StubBackend { &self, _operation: Operation, key: BobKey, - data: BobData, + data: &BobData, ) -> Result<(), Error> { debug!( "PUT[{}]: hi from backend, timestamp: {:?}", @@ -39,7 +39,7 @@ impl BackendStorage for StubBackend { async fn get(&self, _operation: Operation, key: BobKey) -> Result { debug!("GET[{}]: hi from backend", key); - Ok(BobData::new(vec![0], BobMeta::stub())) + Ok(BobData::new(vec![0].into(), BobMeta::stub())) } async fn get_alien(&self, operation: Operation, key: BobKey) -> Result { diff --git a/bob-common/src/data.rs b/bob-common/src/data.rs index dec31f4e1..e82627de0 100755 --- a/bob-common/src/data.rs +++ b/bob-common/src/data.rs @@ -1,8 +1,10 @@ use crate::{ + error::Error, mapper::NodesMap, node::{Disk as NodeDisk, Node}, }; use bob_grpc::{GetOptions, GetSource, PutOptions}; +use bytes::Bytes; use std::{ convert::TryInto, fmt::{Debug, Formatter, Result as FmtResult}, @@ -92,12 +94,14 @@ pub type VDiskId = u32; #[derive(Clone)] pub struct BobData { - inner: Vec, + inner: Bytes, meta: BobMeta, } impl BobData { - pub fn new(inner: Vec, meta: BobMeta) -> Self { + const TIMESTAMP_LEN: usize = 8; + + pub fn new(inner: Bytes, meta: BobMeta) -> Self { BobData { inner, meta } } @@ -105,13 +109,31 @@ impl BobData { &self.inner } - pub fn into_inner(self) -> Vec { + pub fn into_inner(self) -> Bytes { self.inner } pub fn meta(&self) -> &BobMeta { &self.meta } + + pub fn to_serialized_vec(&self) -> Vec { + let mut result = Vec::with_capacity(Self::TIMESTAMP_LEN + self.inner.len()); + result.extend_from_slice(&self.meta.timestamp.to_be_bytes()); + result.extend_from_slice(&self.inner); + result + } + + pub fn from_serialized_bytes(data: Vec) -> Result { + let mut bob_data = Bytes::from(data); + let ts_bytes = bob_data.split_to(Self::TIMESTAMP_LEN); + let ts_bytes = (&*ts_bytes) + .try_into() + .map_err(|e| Error::storage(format!("parse error: {}", e)))?; + let timestamp = u64::from_be_bytes(ts_bytes); + let meta = BobMeta::new(timestamp); + Ok(BobData::new(bob_data, meta)) + } } impl Debug for BobData { diff --git a/bob-grpc/Cargo.toml b/bob-grpc/Cargo.toml index eef8ef4c8..8d441d7e4 100644 --- a/bob-grpc/Cargo.toml +++ b/bob-grpc/Cargo.toml @@ -23,4 +23,5 @@ default-features = false features = [] [build-dependencies] -tonic-build = "0.6" \ No newline at end of file +tonic-build = "0.6" +prost-build = "0.9" \ No newline at end of file diff --git a/bob-grpc/build.rs b/bob-grpc/build.rs index 0e97ef14e..84e684132 100644 --- a/bob-grpc/build.rs +++ b/bob-grpc/build.rs @@ -1,16 +1,20 @@ use std::path::PathBuf; +use prost_build::Config; extern crate tonic_build; +extern crate prost_build; fn main() { let path: PathBuf = format!("{}/src", env!("CARGO_MANIFEST_DIR")).into(); + let mut prost_config = Config::new(); + prost_config.bytes(&["Blob.data"]); if !path.join("bob_storage.rs").exists() { tonic_build::configure() .build_server(true) .build_client(true) .format(false) .out_dir(format!("{}/src", env!("CARGO_MANIFEST_DIR"))) - .compile(&["proto/bob.proto"], &["proto"]) + .compile_with_config(prost_config, &["proto/bob.proto"], &["proto"]) .expect("protobuf compilation"); } } diff --git a/bob/src/api/mod.rs b/bob/src/api/mod.rs index 77edae7bb..8faab4a6f 100644 --- a/bob/src/api/mod.rs +++ b/bob/src/api/mod.rs @@ -984,12 +984,11 @@ where return Err(AuthError::PermissionDenied.into()); } let key = DataKey::from_str(&key)?.0; - let data_buf = body.to_vec(); let meta = BobMeta::new(chrono::Local::now().timestamp() as u64); - let data = BobData::new(data_buf, meta); + let data = BobData::new(body, meta); let opts = BobOptions::new_put(None); - bob.grinder().put(key, data, opts).await?; + bob.grinder().put(key, &data, opts).await?; Ok(StatusCode::CREATED.into()) } diff --git a/bob/src/api/s3.rs b/bob/src/api/s3.rs index 495b8f855..186b5b386 100644 --- a/bob/src/api/s3.rs +++ b/bob/src/api/s3.rs @@ -177,14 +177,13 @@ where if headers.is_source_key_set() { return copy_object(bob, key, headers).await; } - let data_buf = body.to_vec(); let data = BobData::new( - data_buf, + body, BobMeta::new(chrono::Local::now().timestamp() as u64), ); let opts = BobOptions::new_put(None); - bob.grinder().put(key, data, opts).await?; + bob.grinder().put(key, &data, opts).await?; Ok(StatusS3::from(StatusExt::from(StatusCode::CREATED))) } @@ -260,7 +259,7 @@ async fn copy_object( ); let opts = BobOptions::new_put(None); - bob.grinder().put(key, data, opts).await?; + bob.grinder().put(key, &data, opts).await?; Ok(StatusS3::from(StatusExt::from(StatusCode::OK))) } diff --git a/bob/src/cluster/mod.rs b/bob/src/cluster/mod.rs index 2865fdedc..ef6f61b8d 100644 --- a/bob/src/cluster/mod.rs +++ b/bob/src/cluster/mod.rs @@ -11,7 +11,7 @@ use simple::Quorum as SimpleQuorum; #[async_trait] pub(crate) trait Cluster { - async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error>; + async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error>; async fn get(&self, key: BobKey) -> Result; async fn exist(&self, keys: &[BobKey]) -> Result, Error>; async fn delete(&self, key: BobKey) -> Result<(), Error>; diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index c4eef3786..bec341f3c 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -81,7 +81,7 @@ async fn finish_at_least_handles( pub(crate) async fn put_at_least( key: BobKey, - data: BobData, + data: &BobData, target_nodes: impl Iterator, at_least: usize, options: PutOptions, @@ -219,7 +219,7 @@ pub(crate) async fn put_local_all( backend: &Backend, node_names: Vec, key: BobKey, - data: BobData, + data: &BobData, operation: Operation, ) -> Result<(), PutOptions> { let mut add_nodes = vec![]; @@ -228,7 +228,7 @@ pub(crate) async fn put_local_all( op.set_remote_folder(node_name.clone()); debug!("PUT[{}] put to local alien: {:?}", key, node_name); - if let Err(e) = backend.put_local(key, data.clone(), op).await { + if let Err(e) = backend.put_local(key, data, op).await { debug!("PUT[{}] local support put result: {:?}", key, e); add_nodes.push(node_name); } @@ -243,7 +243,7 @@ pub(crate) async fn put_local_all( pub(crate) async fn put_sup_nodes( key: BobKey, - data: BobData, + data: &BobData, requests: &[(&Node, PutOptions)], ) -> Result<(), Vec>> { let mut ret = vec![]; @@ -269,7 +269,7 @@ pub(crate) async fn put_sup_nodes( pub(crate) async fn put_local_node( backend: &Backend, key: BobKey, - data: BobData, + data: &BobData, vdisk_id: VDiskId, disk_path: DiskPath, ) -> Result<(), Error> { diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index 0875bf6f0..76e637925 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -26,7 +26,7 @@ impl Quorum { } } - async fn put_at_least(&self, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put_at_least(&self, key: BobKey, data: &BobData) -> Result<(), Error> { debug!("PUT[{}] ~~~PUT LOCAL NODE FIRST~~~", key); let mut local_put_ok = 0_usize; let mut remote_ok_count = 0_usize; @@ -35,7 +35,7 @@ impl Quorum { let (vdisk_id, disk_path) = self.mapper.get_operation(key); if let Some(path) = disk_path { debug!("disk path is present, try put local"); - let res = put_local_node(&self.backend, key, data.clone(), vdisk_id, path).await; + let res = put_local_node(&self.backend, key, data, vdisk_id, path).await; if let Err(e) = res { error!("{}", e); failed_nodes.push(self.mapper.local_node_name().to_owned()); @@ -50,14 +50,17 @@ impl Quorum { debug!("PUT[{}] need at least {} additional puts", key, at_least); debug!("PUT[{}] ~~~PUT TO REMOTE NODES~~~", key); - let (tasks, errors) = self.put_remote_nodes(key, data.clone(), at_least).await; + let (tasks, errors) = self.put_remote_nodes(key, data, at_least).await; let all_count = self.mapper.get_target_nodes_for_key(key).len(); remote_ok_count += all_count - errors.len() - tasks.len() - local_put_ok; failed_nodes.extend(errors.iter().map(|e| e.node_name().to_string())); if remote_ok_count + local_put_ok >= self.quorum { debug!("PUT[{}] spawn {} background put tasks", key, tasks.len()); let q = self.clone(); - tokio::spawn(q.background_put(tasks, key, data, failed_nodes)); + let data = data.clone(); + tokio::spawn(async move { + q.background_put(tasks, key, &data, failed_nodes).await + }); Ok(()) } else { warn!( @@ -111,7 +114,7 @@ impl Quorum { self, mut rest_tasks: Tasks, key: BobKey, - data: BobData, + data: &BobData, mut failed_nodes: Vec, ) { debug!("PUT[{}] ~~~BACKGROUND PUT TO REMOTE NODES~~~", key); @@ -131,7 +134,7 @@ impl Quorum { } debug!("PUT[{}] ~~~PUT TO REMOTE NODES ALIEN~~~", key); if !failed_nodes.is_empty() { - if let Err(e) = self.put_aliens(failed_nodes, key, data).await { + if let Err(e) = self.put_aliens(failed_nodes, key, &data).await { error!("{}", e); } } @@ -140,7 +143,7 @@ impl Quorum { pub(crate) async fn put_remote_nodes( &self, key: BobKey, - data: BobData, + data: &BobData, at_least: usize, ) -> (Tasks, Vec>) { let local_node = self.mapper.local_node_name(); @@ -187,7 +190,7 @@ impl Quorum { &self, mut failed_nodes: Vec, key: BobKey, - data: BobData, + data: &BobData, ) -> Result<(), Error> { debug!("PUT[{}] ~~~TRY PUT TO REMOTE ALIENS FIRST~~~", key); if failed_nodes.is_empty() { @@ -207,7 +210,7 @@ impl Quorum { .map(|(node, remote_node)| (node, PutOptions::new_alien(vec![remote_node]))) .collect(); debug!("PUT[{}] additional alien requests: {:?}", key, queries); - if let Err(sup_nodes_errors) = put_sup_nodes(key, data.clone(), &queries).await { + if let Err(sup_nodes_errors) = put_sup_nodes(key, data, &queries).await { debug!("support nodes errors: {:?}", sup_nodes_errors); failed_nodes.extend( sup_nodes_errors @@ -222,7 +225,7 @@ impl Quorum { &self.backend, failed_nodes.clone(), key, - data.clone(), + data, operation, ) .await; @@ -240,7 +243,7 @@ impl Quorum { #[async_trait] impl Cluster for Quorum { - async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error> { self.put_at_least(key, data).await } diff --git a/bob/src/cluster/simple.rs b/bob/src/cluster/simple.rs index 50eb03dbb..8a3c91c7a 100644 --- a/bob/src/cluster/simple.rs +++ b/bob/src/cluster/simple.rs @@ -80,7 +80,7 @@ impl Quorum { #[async_trait] impl Cluster for Quorum { - async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> { + async fn put(&self, key: BobKey, data: &BobData) -> Result<(), Error> { self.perform_on_nodes(key, "PUT", |c| { Box::pin(c.put( key, diff --git a/bob/src/cluster/tests.rs b/bob/src/cluster/tests.rs index 0aeec6aca..e37bce782 100644 --- a/bob/src/cluster/tests.rs +++ b/bob/src/cluster/tests.rs @@ -204,7 +204,7 @@ async fn simple_one_node_put_ok() { let key = 1; let result = quorum - .put(BobKey::from(key), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(key), &BobData::new(vec![].into(), BobMeta::new(11))) .await; assert!(result.is_ok()); @@ -236,7 +236,7 @@ async fn simple_two_node_one_vdisk_cluster_put_ok() { let (quorum, backend) = create_cluster(&node, &cluster, &actions).await; let key = 2; let result = quorum - .put(BobKey::from(key), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(key), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1)).await; @@ -272,7 +272,7 @@ async fn simple_two_node_two_vdisk_one_replica_cluster_put_ok() { let (quorum, backend) = create_cluster(&node, &cluster, &actions).await; let mut result = quorum - .put(BobKey::from(3), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(3), &BobData::new(vec![].into(), BobMeta::new(11))) .await; assert!(result.is_ok()); @@ -281,7 +281,7 @@ async fn simple_two_node_two_vdisk_one_replica_cluster_put_ok() { assert_eq!(1, calls[1].1.put_count()); result = quorum - .put(BobKey::from(4), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(4), &BobData::new(vec![].into(), BobMeta::new(11))) .await; assert!(result.is_ok()); @@ -319,7 +319,7 @@ async fn two_node_one_vdisk_cluster_one_node_failed_put_err() { let (quorum, backend) = create_cluster(&node, &cluster, &actions).await; let result = quorum - .put(BobKey::from(5), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(5), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1)).await; @@ -353,7 +353,7 @@ async fn two_node_one_vdisk_cluster_one_node_failed_put_ok() { let (quorum, _) = create_cluster(&node, &cluster, &actions).await; let result = quorum - .put(BobKey::from(5), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(5), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1000)).await; @@ -384,7 +384,7 @@ async fn three_node_two_vdisk_cluster_second_node_failed_put_ok() { sleep(Duration::from_millis(1)).await; let result = quorum - .put(BobKey::from(0), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(0), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1000)).await; assert!(result.is_ok()); @@ -415,7 +415,7 @@ async fn three_node_two_vdisk_cluster_one_node_failed_put_err() { info!("quorum put: 0"); let result = quorum - .put(BobKey::from(0), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(0), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1000)).await; @@ -451,7 +451,7 @@ async fn three_node_two_vdisk_cluster_one_node_failed_put_ok2() { let (quorum, backend) = create_cluster(&node, &cluster, &actions).await; let result = quorum - .put(BobKey::from(0), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(0), &BobData::new(vec![].into(), BobMeta::new(11))) .await; sleep(Duration::from_millis(1000)).await; @@ -488,7 +488,7 @@ async fn three_node_one_vdisk_cluster_one_node_failed_put_ok() { info!("put local: 0"); let result = quorum - .put(BobKey::from(0), BobData::new(vec![], BobMeta::new(11))) + .put(BobKey::from(0), &BobData::new(vec![].into(), BobMeta::new(11))) .await; assert!(result.is_ok()); // assert_eq!(1, calls[0].1.put_count()); diff --git a/bob/src/grinder.rs b/bob/src/grinder.rs index 6268f434c..279ada571 100755 --- a/bob/src/grinder.rs +++ b/bob/src/grinder.rs @@ -77,7 +77,7 @@ impl Grinder { pub(crate) async fn put( &self, key: BobKey, - data: BobData, + data: &BobData, opts: BobOptions, ) -> Result<(), Error> { let sw = Stopwatch::start_new(); diff --git a/bob/src/lib.rs b/bob/src/lib.rs index 86fc2cb19..f05a0223b 100644 --- a/bob/src/lib.rs +++ b/bob/src/lib.rs @@ -112,7 +112,7 @@ pub(crate) mod test_utils { } pub(crate) fn get_ok(node_name: String, timestamp: u64) -> GetResult { - let inner = BobData::new(vec![], BobMeta::new(timestamp)); + let inner = BobData::new(vec![].into(), BobMeta::new(timestamp)); Ok(NodeOutput::new(node_name, inner)) } diff --git a/bob/src/server.rs b/bob/src/server.rs index c869a3662..f1b4ec5a5 100644 --- a/bob/src/server.rs +++ b/bob/src/server.rs @@ -1,6 +1,7 @@ use std::net::IpAddr; use bob_access::{Authenticator, CredentialsHolder}; +use bytes::Bytes; use tokio::{runtime::Handle, task::block_in_place}; use crate::prelude::*; @@ -82,7 +83,7 @@ where } } -fn put_extract(req: PutRequest) -> Option<(BobKey, Vec, u64, Option)> { +fn put_extract(req: PutRequest) -> Option<(BobKey, Bytes, u64, Option)> { let key = req.key?.key; let blob = req.data?; let timestamp = blob.meta.as_ref()?.timestamp; @@ -136,7 +137,7 @@ where ); let put_result = self .grinder - .put(key, data, BobOptions::new_put(options)) + .put(key, &data, BobOptions::new_put(options)) .await; trace!( "grinder processed put request, /{:.3}ms/",