diff --git a/CHANGELOG.md b/CHANGELOG.md index dd34fa48a..a788e5f8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Bob versions changelog ## [Unreleased] #### Added +- Add clusterwide delete operation (#364) - TLS support, TLS for grpc or rest can be enabled via cluster & node config (#303) #### Changed diff --git a/bob-backend/src/core.rs b/bob-backend/src/core.rs index 3670c48be..c0379ce52 100644 --- a/bob-backend/src/core.rs +++ b/bob-backend/src/core.rs @@ -441,15 +441,13 @@ impl Backend { self.inner.free_least_used_resources().await } - pub async fn delete(&self, key: BobKey, with_aliens: bool) -> Result { + pub async fn delete(&self, key: BobKey) -> Result { let (vdisk_id, disk_path) = self.mapper.get_operation(key); let mut ops = vec![]; if let Some(path) = disk_path { ops.push(Operation::new_local(vdisk_id, path.clone())); } - if with_aliens { - ops.push(Operation::new_alien(vdisk_id)); - } + ops.push(Operation::new_alien(vdisk_id)); let total_count = futures::future::join_all(ops.into_iter().map(|op| { trace!("DELETE[{}] try delete", key); self.delete_single(key, op) diff --git a/bob-backend/src/pearl/disk_controller.rs b/bob-backend/src/pearl/disk_controller.rs index c962de667..c95e56db6 100644 --- a/bob-backend/src/pearl/disk_controller.rs +++ b/bob-backend/src/pearl/disk_controller.rs @@ -622,7 +622,7 @@ impl DiskController { if let Ok(group) = vdisk_group { group.delete(key).await } else { - warn!( + debug!( "DELETE[alien][{}] No alien group has been created for vdisk #{}", key, op.vdisk_id() diff --git a/bob-backend/src/pearl/group.rs b/bob-backend/src/pearl/group.rs index 866a77051..7c0a93806 100644 --- a/bob-backend/src/pearl/group.rs +++ b/bob-backend/src/pearl/group.rs @@ -581,6 +581,7 @@ impl Group { debug!("{} not found in {:?}", key, holder) } else { error!("delete error: {}, from : {:?}", err, holder); + return Err(err); } } } diff --git a/bob-common/src/bob_client.rs b/bob-common/src/bob_client.rs index a3f576401..7b053269a 100644 --- a/bob-common/src/bob_client.rs +++ b/bob-common/src/bob_client.rs @@ -1,5 +1,5 @@ pub mod b_client { - use super::{ExistResult, GetResult, PingResult, PutResult, FactoryTlsConfig}; + use super::{DeleteResult, ExistResult, GetResult, PingResult, PutResult, FactoryTlsConfig}; use crate::{ data::{BobData, BobKey, BobMeta}, error::Error, @@ -7,8 +7,8 @@ pub mod b_client { node::{Node, Output as NodeOutput}, }; use bob_grpc::{ - bob_api_client::BobApiClient, Blob, BlobKey, BlobMeta, ExistRequest, ExistResponse, - GetOptions, GetRequest, Null, PutOptions, PutRequest, + bob_api_client::BobApiClient, Blob, BlobKey, BlobMeta, DeleteOptions, DeleteRequest, + ExistRequest, ExistResponse, GetOptions, GetRequest, Null, PutOptions, PutRequest, }; use mockall::mock; use std::{ @@ -97,12 +97,12 @@ pub mod b_client { Ok(_) => { self.metrics.put_timer_stop(timer); Ok(NodeOutput::new(node_name, ())) - }, + } Err(e) => { self.metrics.put_error_count(); self.metrics.put_timer_stop(timer); - Err(NodeOutput::new(node_name, e.into())) - }, + Err(NodeOutput::new(node_name, e.into())) + } } } @@ -183,6 +183,26 @@ pub mod b_client { } } + pub async fn delete(&self, key: BobKey, options: DeleteOptions) -> DeleteResult { + let mut client = self.client.clone(); + self.metrics.delete_count(); + let timer = BobClientMetrics::start_timer(); + let message = DeleteRequest { + key: Some(BlobKey { key: key.into() }), + options: Some(options), + }; + let mut req = Request::new(message); + self.set_credentials(&mut req); + self.set_timeout(&mut req); + let res = client.delete(req).await; + self.metrics.delete_timer_stop(timer); + res.map(|_| NodeOutput::new(self.node().name().to_owned(), ())) + .map_err(|e| { + self.metrics.delete_error_count(); + NodeOutput::new(self.node().name().to_owned(), e.into()) + }) + } + fn set_credentials(&self, req: &mut Request) { let val = MetadataValue::from_str(&self.local_node_name) .expect("failed to create metadata value from node name"); @@ -202,6 +222,7 @@ pub mod b_client { pub async fn ping(&self) -> PingResult; pub fn node(&self) -> &Node; pub async fn exist(&self, keys: Vec, options: GetOptions) -> ExistResult; + pub async fn delete(&self, key: BobKey, options: DeleteOptions) -> DeleteResult; } impl Clone for BobClient { fn clone(&self) -> Self; @@ -240,13 +261,17 @@ cfg_if::cfg_if! { } } -pub type PutResult = Result, NodeOutput>; +type NodeResult = Result, NodeOutput>; + +pub type PutResult = NodeResult<()>; + +pub type GetResult = NodeResult; -pub type GetResult = Result, NodeOutput>; +pub type PingResult = NodeResult<()>; -pub type PingResult = Result, NodeOutput>; +pub type ExistResult = NodeResult>; -pub type ExistResult = Result>, NodeOutput>; +pub type DeleteResult = NodeResult<()>; #[derive(Clone)] pub struct FactoryTlsConfig { @@ -280,7 +305,7 @@ impl Factory { } } pub async fn produce(&self, node: Node) -> Result { - let metrics = self.metrics.clone().get_metrics(&node.counter_display()); + let metrics = self.metrics.clone().get_metrics(); BobClient::create(node, self.operation_timeout, metrics, self.local_node_name.clone(), self.tls_config.as_ref()).await } } diff --git a/bob-common/src/metrics/mod.rs b/bob-common/src/metrics/mod.rs index 27bec2914..702358004 100644 --- a/bob-common/src/metrics/mod.rs +++ b/bob-common/src/metrics/mod.rs @@ -41,6 +41,13 @@ pub const GRINDER_EXIST_ERROR_COUNT_COUNTER: &str = "cluster_grinder.exist_error /// Measures processing time of the EXIST request pub const GRINDER_EXIST_TIMER: &str = "cluster_grinder.exist_timer"; +/// Counts number of DELETE requests, processed by Grinder +pub const GRINDER_DELETE_COUNTER: &str = "cluster_grinder.delete_count"; +/// Counts number of DELETE requests return error, processed by Grinder +pub const GRINDER_DELETE_ERROR_COUNT_COUNTER: &str = "cluster_grinder.delete_error_count"; +/// Measures processing time of the DELETE request +pub const GRINDER_DELETE_TIMER: &str = "cluster_grinder.delete_timer"; + /// Counts number of PUT requests, processed by Client pub const CLIENT_PUT_COUNTER: &str = "client.put_count"; /// Counts number of PUT requests return error, processed by Client @@ -111,16 +118,51 @@ pub type Timer = Instant; /// Structure contains put/get metrics for `BobClient` #[derive(Debug, Clone)] pub struct BobClient { - prefix: String, + prefixed_names: Arc, +} + +#[derive(Debug, Clone)] +struct PrefixedNames { + put_count: String, + put_timer: String, + put_error_count: String, + get_count: String, + get_timer: String, + get_error_count: String, + exist_count: String, + exist_timer: String, + exist_error_count: String, + delete_count: String, + delete_timer: String, + delete_error_count: String, +} + +impl PrefixedNames { + fn new(prefix: &str) -> Self { + Self { + put_count: format!("{}.put_count", prefix), + put_timer: format!("{}.put_timer", prefix), + put_error_count: format!("{}.put_error_count", prefix), + get_count: format!("{}.get_count", prefix), + get_timer: format!("{}.get_timer", prefix), + get_error_count: format!("{}.get_error_count", prefix), + exist_count: format!("{}.exist_count", prefix), + exist_timer: format!("{}.exist_timer", prefix), + exist_error_count: format!("{}.exist_error_count", prefix), + delete_count: format!("{}.delete_count", prefix), + delete_timer: format!("{}.delete_timer", prefix), + delete_error_count: format!("{}.delete_error_count", prefix), + } + } } impl BobClient { - fn new(prefix: String) -> Self { - BobClient { prefix } + fn new(prefixed_names: Arc) -> Self { + BobClient { prefixed_names } } pub(crate) fn put_count(&self) { - counter!(self.prefix.clone() + ".put_count", 1); + counter!(self.prefixed_names.put_count.clone(), 1); } pub(crate) fn start_timer() -> Timer { @@ -130,68 +172,86 @@ impl BobClient { #[allow(clippy::cast_possible_truncation)] pub(crate) fn put_timer_stop(&self, timer: Timer) { histogram!( - self.prefix.clone() + ".put_timer", + self.prefixed_names.put_timer.clone(), timer.elapsed().as_nanos() as f64 ); } pub(crate) fn put_error_count(&self) { - counter!(self.prefix.clone() + ".put_error_count", 1); + counter!(self.prefixed_names.put_error_count.clone(), 1); } pub(crate) fn get_count(&self) { - counter!(self.prefix.clone() + ".get_count", 1); + counter!(self.prefixed_names.get_count.clone(), 1); } pub(crate) fn get_timer_stop(&self, timer: Timer) { histogram!( - self.prefix.clone() + ".get_timer", + self.prefixed_names.get_timer.clone(), timer.elapsed().as_nanos() as f64 ); } pub(crate) fn get_error_count(&self) { - counter!(self.prefix.clone() + ".get_error_count", 1); + counter!(self.prefixed_names.get_error_count.clone(), 1); } pub(crate) fn exist_count(&self) { - counter!(self.prefix.clone() + ".exist_count", 1); + counter!(self.prefixed_names.exist_count.clone(), 1); + } + + pub(crate) fn exist_timer_stop(&self, timer: Timer) { + histogram!( + self.prefixed_names.exist_timer.clone(), + timer.elapsed().as_nanos() as f64 + ); } pub(crate) fn exist_error_count(&self) { - counter!(self.prefix.clone() + ".exist_error_count", 1); + counter!(self.prefixed_names.exist_error_count.clone(), 1); } - pub(crate) fn exist_timer_stop(&self, timer: Timer) { + pub(crate) fn delete_count(&self) { + counter!(self.prefixed_names.delete_count.clone(), 1); + } + + pub(crate) fn delete_timer_stop(&self, timer: Timer) { histogram!( - self.prefix.clone() + ".exist_timer", + self.prefixed_names.delete_timer.clone(), timer.elapsed().as_nanos() as f64 ); } + + pub(crate) fn delete_error_count(&self) { + counter!(self.prefixed_names.delete_error_count.clone(), 1); + } } #[derive(Debug, Clone)] struct MetricsContainer { duration: Duration, - prefix: String, + prefixed_names: Arc, } impl MetricsContainer { pub(crate) fn new(duration: Duration, prefix: String) -> Self { - MetricsContainer { duration, prefix } + let prefixed_names = Arc::new(PrefixedNames::new(&prefix)); + MetricsContainer { + duration, + prefixed_names, + } } } /// A trait for generic metrics builders pub trait ContainerBuilder { /// Initializes `BobClient` container with given name - fn get_metrics(&self, name: &str) -> BobClient; + fn get_metrics(&self) -> BobClient; } impl ContainerBuilder for MetricsContainer { - fn get_metrics(&self, name: &str) -> BobClient { - let prefix = self.prefix.clone() + "." + name; - BobClient::new(prefix) + fn get_metrics(&self) -> BobClient { + BobClient::new(self.prefixed_names.clone()) } } @@ -207,7 +267,14 @@ pub async fn init_counters( //install_prometheus(); //install_graphite(node_config, local_address); let shared = install_global(node_config, local_address).await; - let container = MetricsContainer::new(Duration::from_secs(1), CLIENTS_METRICS_DIR.to_owned()); + let container = MetricsContainer::new( + Duration::from_secs(1), + format!( + "{}.{}", + CLIENTS_METRICS_DIR, + local_address.replace(".", "_") + ), + ); info!( "metrics container initialized with update interval: {}ms", container.duration.as_millis() diff --git a/bob-common/src/node.rs b/bob-common/src/node.rs index fd31d0f13..d950c9a14 100644 --- a/bob-common/src/node.rs +++ b/bob-common/src/node.rs @@ -66,10 +66,6 @@ impl Node { .expect("build uri") } - pub fn counter_display(&self) -> String { - self.address.to_string().replace('.', "_") - } - pub async fn set_connection(&self, client: BobClient) { *self.conn.write().await = Some(client); } diff --git a/bob-grpc/proto/bob.proto b/bob-grpc/proto/bob.proto index 2da43d29c..2fb787847 100644 --- a/bob-grpc/proto/bob.proto +++ b/bob-grpc/proto/bob.proto @@ -8,6 +8,7 @@ service BobApi { rpc Get (GetRequest) returns (Blob) {} rpc Ping (Null) returns (Null) {} rpc Exist (ExistRequest) returns (ExistResponse) {} + rpc Delete (DeleteRequest) returns (OpStatus) {} } message Null {}; @@ -31,6 +32,12 @@ message ExistRequest { GetOptions options = 2; // Options, same as in get request } +// Delete operation parameters +message DeleteRequest { + BlobKey key = 1; // Blob's key + DeleteOptions options = 2; // Options +} + // Blob id message BlobKey { bytes key = 1; //Inner id representation @@ -79,3 +86,8 @@ message BobError { int32 code = 1; // Error code in case of error string desc = 2; // Error desription } + +// Delete operation options +message DeleteOptions { + bool force_node = 1; // Force operation to be served by node to which it comes +} \ No newline at end of file diff --git a/bob-grpc/src/lib.rs b/bob-grpc/src/lib.rs index 39b5d1df9..18450b81c 100644 --- a/bob-grpc/src/lib.rs +++ b/bob-grpc/src/lib.rs @@ -49,6 +49,20 @@ impl GetOptions { } } +impl DeleteOptions { + pub fn new_all() -> Self { + Self { + force_node: false, + } + } + + pub fn new_local() -> Self { + Self { + force_node: true, + } + } +} + impl From for GetSource { fn from(value: i32) -> Self { match value { diff --git a/bob/src/api/mod.rs b/bob/src/api/mod.rs index b2667372e..77edae7bb 100644 --- a/bob/src/api/mod.rs +++ b/bob/src/api/mod.rs @@ -6,6 +6,7 @@ use axum::{ routing::{delete, get, post, MethodRouter}, Json, Router, Server, }; +use bob_grpc::DeleteOptions; use axum_server::{bind_rustls, tls_rustls::{RustlsConfig, RustlsAcceptor}, Server as AxumServer}; pub(crate) use bob_access::Error as AuthError; @@ -1005,9 +1006,9 @@ where return Err(AuthError::PermissionDenied.into()); } let key = DataKey::from_str(&key)?.0; - bob.block_on(bob.grinder().delete(key, true)) + bob.block_on(bob.grinder().delete(key, DeleteOptions::new_all())) .map_err(|e| internal(e.to_string())) - .map(|res| StatusExt::new(StatusCode::OK, true, format!("{}", res))) + .map(|_| StatusExt::new(StatusCode::OK, true, format!("Done"))) } fn internal(message: String) -> StatusExt { diff --git a/bob/src/cluster/mod.rs b/bob/src/cluster/mod.rs index 151ed3690..2865fdedc 100644 --- a/bob/src/cluster/mod.rs +++ b/bob/src/cluster/mod.rs @@ -14,6 +14,7 @@ pub(crate) trait Cluster { async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error>; async fn get(&self, key: BobKey) -> Result; async fn exist(&self, keys: &[BobKey]) -> Result, Error>; + async fn delete(&self, key: BobKey) -> Result<(), Error>; } pub(crate) fn get_cluster( diff --git a/bob/src/cluster/operations.rs b/bob/src/cluster/operations.rs index 843c4147a..c4eef3786 100644 --- a/bob/src/cluster/operations.rs +++ b/bob/src/cluster/operations.rs @@ -30,6 +30,18 @@ fn call_node_put( tokio::spawn(task) } +fn call_node_delete( + key: BobKey, + options: DeleteOptions, + node: Node, +) -> JoinHandle, NodeOutput>> { + debug!("DELETE[{}] delete to {}", key, node.name()); + let task = async move { + LinkManager::call_node(&node, |conn| conn.delete(key, options).boxed()).await + }; + tokio::spawn(task) +} + fn is_result_successful( join_res: Result, NodeOutput>, JoinError>, errors: &mut Vec>, @@ -74,10 +86,32 @@ pub(crate) async fn put_at_least( at_least: usize, options: PutOptions, ) -> (Tasks, Vec>) { - let mut handles: FuturesUnordered<_> = target_nodes - .cloned() - .map(|node| call_node_put(key, data.clone(), node, options.clone())) - .collect(); + call_at_least(target_nodes, at_least, |n| { + call_node_put(key, data.clone(), n, options.clone()) + }) + .await +} + +pub(crate) async fn delete_at_nodes( + key: BobKey, + target_nodes: impl Iterator, + target_nodes_count: usize, + options: DeleteOptions, +) -> Vec> { + let (tasks, errors) = call_at_least(target_nodes, target_nodes_count, |n| { + call_node_delete(key, options.clone(), n) + }) + .await; + assert!(tasks.is_empty()); + errors +} + +async fn call_at_least( + target_nodes: impl Iterator, + at_least: usize, + f: impl Fn(Node) -> JoinHandle, NodeOutput>>, +) -> (Tasks, Vec>) { + let mut handles: FuturesUnordered<_> = target_nodes.cloned().map(|node| f(node)).collect(); debug!("total handles count: {}", handles.len()); let errors = finish_at_least_handles(&mut handles, at_least).await; debug!("remains: {}, errors: {}", handles.len(), errors.len()); @@ -243,3 +277,12 @@ pub(crate) async fn put_local_node( let op = Operation::new_local(vdisk_id, disk_path); backend.put_local(key, data, op).await } + +pub(crate) async fn delete_at_local_node( + backend: &Backend, + key: BobKey, +) -> Result<(), Error> { + debug!("local node has vdisk replica, put local"); + backend.delete(key).await?; + Ok(()) +} diff --git a/bob/src/cluster/quorum.rs b/bob/src/cluster/quorum.rs index 9fb707a6b..0875bf6f0 100644 --- a/bob/src/cluster/quorum.rs +++ b/bob/src/cluster/quorum.rs @@ -2,8 +2,9 @@ use crate::prelude::*; use super::{ operations::{ - group_keys_by_nodes, lookup_local_alien, lookup_local_node, lookup_remote_aliens, - lookup_remote_nodes, put_at_least, put_local_all, put_local_node, put_sup_nodes, Tasks, + delete_at_nodes, delete_at_local_node, group_keys_by_nodes, lookup_local_alien, lookup_local_node, + lookup_remote_aliens, lookup_remote_nodes, put_at_least, put_local_all, put_local_node, + put_sup_nodes, Tasks, }, Cluster, }; @@ -77,6 +78,35 @@ impl Quorum { } } + async fn delete_on_nodes(&self, key: BobKey) -> Result<(), Error> { + debug!("DELETE[{}] ~~~DELETE LOCAL NODE FIRST~~~", key); + let res = delete_at_local_node(&self.backend, key).await; + let local_delete_ok = if let Err(e) = res { + error!("{}", e); + false + } else { + debug!("DELETE[{}] local node delete successful", key); + true + }; + + debug!("DELETE[{}] ~~~DELETE TO REMOTE NODES~~~", key); + let (errors, remote_count) = self.delete_at_remote_nodes(key).await; + let remote_ok_count = remote_count - errors.len(); + if errors.len() > 0 || !local_delete_ok { + warn!( + "DELETE[{}] was not successful. local done: {}, remote {}, failed {}, errors: {:?}", + key, + local_delete_ok, + remote_ok_count, + errors.len(), + errors + ); + Err(Error::failed("Data was deleted not on all nodes")) + } else { + Ok(()) + } + } + async fn background_put( self, mut rest_tasks: Tasks, @@ -124,6 +154,35 @@ impl Quorum { put_at_least(key, data, target_nodes, at_least, PutOptions::new_local()).await } + pub(crate) async fn delete_at_remote_nodes( + &self, + key: BobKey, + ) -> (Vec>, usize) { + let local_node = self.mapper.local_node_name(); + let target_nodes: Vec<_> = self + .mapper + .nodes() + .values() + .filter(|n| n.name() != local_node) + .collect(); + debug!( + "DELETE[{}] cluster quorum put remote nodes {} total target nodes", + key, + target_nodes.len(), + ); + let count = target_nodes.len(); + ( + delete_at_nodes( + key, + target_nodes.into_iter(), + count, + DeleteOptions::new_local(), + ) + .await, + count, + ) + } + pub(crate) async fn put_aliens( &self, mut failed_nodes: Vec, @@ -227,4 +286,8 @@ impl Cluster for Quorum { } Ok(exist) } + + async fn delete(&self, key: BobKey) -> Result<(), Error> { + self.delete_on_nodes(key).await + } } diff --git a/bob/src/cluster/simple.rs b/bob/src/cluster/simple.rs index ef1742edf..50eb03dbb 100644 --- a/bob/src/cluster/simple.rs +++ b/bob/src/cluster/simple.rs @@ -38,34 +38,33 @@ impl Quorum { } keys_by_nodes } -} -#[async_trait] -impl Cluster for Quorum { - async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> { - let target_nodes = self.get_target_nodes(key); + async fn perform_on_nodes( + &self, + key: BobKey, + metrics_prefix: &str, + client_fun: F, + ) -> Result<(), Error> + where + F: FnMut(&'_ BobClient) -> crate::link_manager::ClusterCallFuture<'_, T> + Send + Clone, + T: Send + Debug, + { + let target_nodes: Vec<_> = self.mapper.nodes().values().cloned().collect(); - debug!("PUT[{}]: Nodes for fan out: {:?}", key, &target_nodes); + debug!( + "{}[{}]: Nodes for fan out: {:?}", + metrics_prefix, key, &target_nodes + ); let l_quorum = self.quorum as usize; - let reqs = LinkManager::call_nodes(target_nodes.iter(), |mock_bob_client| { - Box::pin(mock_bob_client.put( - key, - data.clone(), - PutOptions { - remote_nodes: vec![], //TODO check - force_node: true, - overwrite: false, - }, - )) - }); + let reqs = LinkManager::call_nodes(target_nodes.iter(), client_fun); let results = reqs.await; let total_count = results.len(); let errors = results.iter().filter(|r| r.is_err()).collect::>(); let ok_count = total_count - errors.len(); debug!( - "PUT[{}] total requests: {} ok: {} quorum: {}", - key, total_count, ok_count, l_quorum + "{}[{}] total requests: {} ok: {} quorum: {}", + metrics_prefix, key, total_count, ok_count, l_quorum ); // TODO: send actuall list of vdisk it has been written on if ok_count >= l_quorum { @@ -77,6 +76,24 @@ impl Cluster for Quorum { ))) } } +} + +#[async_trait] +impl Cluster for Quorum { + async fn put(&self, key: BobKey, data: BobData) -> Result<(), Error> { + self.perform_on_nodes(key, "PUT", |c| { + Box::pin(c.put( + key, + data.clone(), + PutOptions { + remote_nodes: vec![], //TODO check + force_node: true, + overwrite: false, + }, + )) + }) + .await + } async fn get(&self, key: BobKey) -> Result { let target_nodes = self.get_target_nodes(key); @@ -115,4 +132,11 @@ impl Cluster for Quorum { } Ok(exist) } + + async fn delete(&self, key: BobKey) -> Result<(), Error> { + self.perform_on_nodes(key, "DELETE", move |c| { + Box::pin(c.delete(key, DeleteOptions::new_local())) + }) + .await + } } diff --git a/bob/src/grinder.rs b/bob/src/grinder.rs index c70ee400e..6268f434c 100755 --- a/bob/src/grinder.rs +++ b/bob/src/grinder.rs @@ -10,6 +10,7 @@ use crate::{ use bob_common::metrics::{ CLIENT_DELETE_COUNTER, CLIENT_DELETE_ERROR_COUNT_COUNTER, CLIENT_DELETE_TIMER, + GRINDER_DELETE_COUNTER, GRINDER_DELETE_ERROR_COUNT_COUNTER, GRINDER_DELETE_TIMER, }; use metrics::histogram as timing; @@ -209,23 +210,39 @@ impl Grinder { self.hw_counter.spawn_task(); } - pub(crate) async fn delete(&self, key: BobKey, with_aliens: bool) -> Result { + pub(crate) async fn delete(&self, key: BobKey, options: DeleteOptions) -> Result<(), Error> { trace!(">>>- - - - - GRINDER DELETE START - - - - -"); - counter!(CLIENT_DELETE_COUNTER, 1); - let sw = Stopwatch::start_new(); - trace!( - "pass request to backend, /{:.3}ms/", - sw.elapsed().as_secs_f64() * 1000.0 - ); - let result = self.backend.delete(key, with_aliens).await; - trace!( - "backend processed delete, /{:.3}ms/", - sw.elapsed().as_secs_f64() * 1000.0 - ); - if result.is_err() { - counter!(CLIENT_DELETE_ERROR_COUNT_COUNTER, 1); - } - timing!(CLIENT_DELETE_TIMER, sw.elapsed().as_nanos() as f64); + let result = if options.force_node { + counter!(CLIENT_DELETE_COUNTER, 1); + let sw = Stopwatch::start_new(); + trace!( + "pass delete request to backend, /{:.3}ms/", + sw.elapsed().as_secs_f64() * 1000.0 + ); + let result = self.backend.delete(key).await; + trace!( + "backend processed delete, /{:.3}ms/", + sw.elapsed().as_secs_f64() * 1000.0 + ); + if result.is_err() { + counter!(CLIENT_DELETE_ERROR_COUNT_COUNTER, 1); + } + timing!(CLIENT_DELETE_TIMER, sw.elapsed().as_nanos() as f64); + result.map(|_| ()) + } else { + counter!(GRINDER_DELETE_COUNTER, 1); + let sw = Stopwatch::start_new(); + let result = self.cluster.delete(key).await; + trace!( + "cluster processed delete, /{:.3}ms/", + sw.elapsed().as_secs_f64() * 1000.0 + ); + if result.is_err() { + counter!(GRINDER_DELETE_ERROR_COUNT_COUNTER, 1); + } + timing!(GRINDER_DELETE_TIMER, sw.elapsed().as_nanos() as f64); + result + }; trace!(">>>- - - - - GRINDER DELETE FINISHED - - - - -"); self.cleaner.request_index_cleanup(); result diff --git a/bob/src/lib.rs b/bob/src/lib.rs index 4b422121e..86fc2cb19 100644 --- a/bob/src/lib.rs +++ b/bob/src/lib.rs @@ -33,14 +33,14 @@ pub use bob_common::{ Cluster as ClusterConfig, Node as ClusterNodeConfig, Rack as ClusterRackConfig, Replica as ReplicaConfig, VDisk as VDiskConfig, }, - configs::node::{Node as NodeConfig, BackendType}, + configs::node::{BackendType, Node as NodeConfig}, data::BOB_KEY_SIZE, mapper::Virtual as VirtualMapper, metrics::init_counters, }; pub use bob_grpc::{ bob_api_client::BobApiClient, bob_api_server::BobApiServer, Blob, BlobKey, BlobMeta, - ExistRequest, GetOptions, GetRequest, GetSource, PutOptions, PutRequest, + DeleteRequest, ExistRequest, GetOptions, GetRequest, GetSource, PutOptions, PutRequest, }; mod prelude { @@ -64,8 +64,8 @@ mod prelude { node::{Node, Output as NodeOutput}, }; pub use bob_grpc::{ - bob_api_server::BobApi, Blob, BlobMeta, ExistRequest, ExistResponse, GetOptions, - GetRequest, Null, OpStatus, PutOptions, PutRequest, + bob_api_server::BobApi, Blob, BlobMeta, DeleteOptions, DeleteRequest, ExistRequest, + ExistResponse, GetOptions, GetRequest, Null, OpStatus, PutOptions, PutRequest, }; pub use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; pub use std::{ diff --git a/bob/src/server.rs b/bob/src/server.rs index 3acf1a905..c869a3662 100644 --- a/bob/src/server.rs +++ b/bob/src/server.rs @@ -242,4 +242,21 @@ where let response = Response::new(response); Ok(response) } + + async fn delete(&self, req: Request) -> ApiResult { + let req = req.into_inner(); + let DeleteRequest { key, options } = req; + if let Some((key, options)) = key.zip(options) { + let sw = Stopwatch::start_new(); + self.grinder.delete(key.key.into(), options).await?; + let elapsed = sw.elapsed(); + debug!("DELETE-OK dt: {:?}", elapsed); + Ok(Response::new(OpStatus { error: None })) + } else { + Err(Status::new( + Code::InvalidArgument, + "Key and options are mandatory", + )) + } + } }