Skip to content

Commit

Permalink
364 clusterwide record delete (#465)
Browse files Browse the repository at this point in the history
* add delete request and rpc

* add delete to bobclient

* add cluster impl for simple

* fix  build errors

* add remote node call for quorum delete

* add delete to quorum

* add delete to server

* add delete metrics

* update changelog

* send error in delete upper

* fix timer position

* fix publicity modifier

* make delete remove on all target nodes

* perform delete on all nodes with aliens

* make delete successful only in case of all ops successful

* Update bob/src/grinder.rs

* add all_nodes flag for simple cluster

* update mockbobclient

* remove redundant filter

* remove redundant concat

* invert with_aliens flag

* change name of var

* remove redundant calcs

* remove concat in metrics

* add missing error propagation in delete

* remove without_aliens flag

* rename delete

* move delete error to debug level
  • Loading branch information
idruzhitskiy authored Oct 4, 2022
1 parent d91d7d1 commit ad16767
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Bob versions changelog

## [Unreleased]
#### Added
- Add clusterwide delete operation (#364)
- TLS support, TLS for grpc or rest can be enabled via cluster & node config (#303)

#### Changed
Expand Down
6 changes: 2 additions & 4 deletions bob-backend/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,13 @@ impl Backend {
self.inner.free_least_used_resources().await
}

pub async fn delete(&self, key: BobKey, with_aliens: bool) -> Result<u64, Error> {
pub async fn delete(&self, key: BobKey) -> Result<u64, Error> {
let (vdisk_id, disk_path) = self.mapper.get_operation(key);
let mut ops = vec![];
if let Some(path) = disk_path {
ops.push(Operation::new_local(vdisk_id, path.clone()));
}
if with_aliens {
ops.push(Operation::new_alien(vdisk_id));
}
ops.push(Operation::new_alien(vdisk_id));
let total_count = futures::future::join_all(ops.into_iter().map(|op| {
trace!("DELETE[{}] try delete", key);
self.delete_single(key, op)
Expand Down
2 changes: 1 addition & 1 deletion bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ impl DiskController {
if let Ok(group) = vdisk_group {
group.delete(key).await
} else {
warn!(
debug!(
"DELETE[alien][{}] No alien group has been created for vdisk #{}",
key,
op.vdisk_id()
Expand Down
1 change: 1 addition & 0 deletions bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ impl Group {
debug!("{} not found in {:?}", key, holder)
} else {
error!("delete error: {}, from : {:?}", err, holder);
return Err(err);
}
}
}
Expand Down
47 changes: 36 additions & 11 deletions bob-common/src/bob_client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
pub mod b_client {
use super::{ExistResult, GetResult, PingResult, PutResult, FactoryTlsConfig};
use super::{DeleteResult, ExistResult, GetResult, PingResult, PutResult, FactoryTlsConfig};
use crate::{
data::{BobData, BobKey, BobMeta},
error::Error,
metrics::BobClient as BobClientMetrics,
node::{Node, Output as NodeOutput},
};
use bob_grpc::{
bob_api_client::BobApiClient, Blob, BlobKey, BlobMeta, ExistRequest, ExistResponse,
GetOptions, GetRequest, Null, PutOptions, PutRequest,
bob_api_client::BobApiClient, Blob, BlobKey, BlobMeta, DeleteOptions, DeleteRequest,
ExistRequest, ExistResponse, GetOptions, GetRequest, Null, PutOptions, PutRequest,
};
use mockall::mock;
use std::{
Expand Down Expand Up @@ -97,12 +97,12 @@ pub mod b_client {
Ok(_) => {
self.metrics.put_timer_stop(timer);
Ok(NodeOutput::new(node_name, ()))
},
}
Err(e) => {
self.metrics.put_error_count();
self.metrics.put_timer_stop(timer);
Err(NodeOutput::new(node_name, e.into()))
},
Err(NodeOutput::new(node_name, e.into()))
}
}
}

Expand Down Expand Up @@ -183,6 +183,26 @@ pub mod b_client {
}
}

pub async fn delete(&self, key: BobKey, options: DeleteOptions) -> DeleteResult {
let mut client = self.client.clone();
self.metrics.delete_count();
let timer = BobClientMetrics::start_timer();
let message = DeleteRequest {
key: Some(BlobKey { key: key.into() }),
options: Some(options),
};
let mut req = Request::new(message);
self.set_credentials(&mut req);
self.set_timeout(&mut req);
let res = client.delete(req).await;
self.metrics.delete_timer_stop(timer);
res.map(|_| NodeOutput::new(self.node().name().to_owned(), ()))
.map_err(|e| {
self.metrics.delete_error_count();
NodeOutput::new(self.node().name().to_owned(), e.into())
})
}

fn set_credentials<T>(&self, req: &mut Request<T>) {
let val = MetadataValue::from_str(&self.local_node_name)
.expect("failed to create metadata value from node name");
Expand All @@ -202,6 +222,7 @@ pub mod b_client {
pub async fn ping(&self) -> PingResult;
pub fn node(&self) -> &Node;
pub async fn exist(&self, keys: Vec<BobKey>, options: GetOptions) -> ExistResult;
pub async fn delete(&self, key: BobKey, options: DeleteOptions) -> DeleteResult;
}
impl Clone for BobClient {
fn clone(&self) -> Self;
Expand Down Expand Up @@ -240,13 +261,17 @@ cfg_if::cfg_if! {
}
}

pub type PutResult = Result<NodeOutput<()>, NodeOutput<Error>>;
type NodeResult<T> = Result<NodeOutput<T>, NodeOutput<Error>>;

pub type PutResult = NodeResult<()>;

pub type GetResult = NodeResult<BobData>;

pub type GetResult = Result<NodeOutput<BobData>, NodeOutput<Error>>;
pub type PingResult = NodeResult<()>;

pub type PingResult = Result<NodeOutput<()>, NodeOutput<Error>>;
pub type ExistResult = NodeResult<Vec<bool>>;

pub type ExistResult = Result<NodeOutput<Vec<bool>>, NodeOutput<Error>>;
pub type DeleteResult = NodeResult<()>;

#[derive(Clone)]
pub struct FactoryTlsConfig {
Expand Down Expand Up @@ -280,7 +305,7 @@ impl Factory {
}
}
pub async fn produce(&self, node: Node) -> Result<BobClient, String> {
let metrics = self.metrics.clone().get_metrics(&node.counter_display());
let metrics = self.metrics.clone().get_metrics();
BobClient::create(node, self.operation_timeout, metrics, self.local_node_name.clone(), self.tls_config.as_ref()).await
}
}
Expand Down
107 changes: 87 additions & 20 deletions bob-common/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ pub const GRINDER_EXIST_ERROR_COUNT_COUNTER: &str = "cluster_grinder.exist_error
/// Measures processing time of the EXIST request
pub const GRINDER_EXIST_TIMER: &str = "cluster_grinder.exist_timer";

/// Counts number of DELETE requests, processed by Grinder
pub const GRINDER_DELETE_COUNTER: &str = "cluster_grinder.delete_count";
/// Counts number of DELETE requests return error, processed by Grinder
pub const GRINDER_DELETE_ERROR_COUNT_COUNTER: &str = "cluster_grinder.delete_error_count";
/// Measures processing time of the DELETE request
pub const GRINDER_DELETE_TIMER: &str = "cluster_grinder.delete_timer";

/// Counts number of PUT requests, processed by Client
pub const CLIENT_PUT_COUNTER: &str = "client.put_count";
/// Counts number of PUT requests return error, processed by Client
Expand Down Expand Up @@ -111,16 +118,51 @@ pub type Timer = Instant;
/// Structure contains put/get metrics for `BobClient`
#[derive(Debug, Clone)]
pub struct BobClient {
prefix: String,
prefixed_names: Arc<PrefixedNames>,
}

#[derive(Debug, Clone)]
struct PrefixedNames {
put_count: String,
put_timer: String,
put_error_count: String,
get_count: String,
get_timer: String,
get_error_count: String,
exist_count: String,
exist_timer: String,
exist_error_count: String,
delete_count: String,
delete_timer: String,
delete_error_count: String,
}

impl PrefixedNames {
fn new(prefix: &str) -> Self {
Self {
put_count: format!("{}.put_count", prefix),
put_timer: format!("{}.put_timer", prefix),
put_error_count: format!("{}.put_error_count", prefix),
get_count: format!("{}.get_count", prefix),
get_timer: format!("{}.get_timer", prefix),
get_error_count: format!("{}.get_error_count", prefix),
exist_count: format!("{}.exist_count", prefix),
exist_timer: format!("{}.exist_timer", prefix),
exist_error_count: format!("{}.exist_error_count", prefix),
delete_count: format!("{}.delete_count", prefix),
delete_timer: format!("{}.delete_timer", prefix),
delete_error_count: format!("{}.delete_error_count", prefix),
}
}
}

impl BobClient {
fn new(prefix: String) -> Self {
BobClient { prefix }
fn new(prefixed_names: Arc<PrefixedNames>) -> Self {
BobClient { prefixed_names }
}

pub(crate) fn put_count(&self) {
counter!(self.prefix.clone() + ".put_count", 1);
counter!(self.prefixed_names.put_count.clone(), 1);
}

pub(crate) fn start_timer() -> Timer {
Expand All @@ -130,68 +172,86 @@ impl BobClient {
#[allow(clippy::cast_possible_truncation)]
pub(crate) fn put_timer_stop(&self, timer: Timer) {
histogram!(
self.prefix.clone() + ".put_timer",
self.prefixed_names.put_timer.clone(),
timer.elapsed().as_nanos() as f64
);
}

pub(crate) fn put_error_count(&self) {
counter!(self.prefix.clone() + ".put_error_count", 1);
counter!(self.prefixed_names.put_error_count.clone(), 1);
}

pub(crate) fn get_count(&self) {
counter!(self.prefix.clone() + ".get_count", 1);
counter!(self.prefixed_names.get_count.clone(), 1);
}

pub(crate) fn get_timer_stop(&self, timer: Timer) {
histogram!(
self.prefix.clone() + ".get_timer",
self.prefixed_names.get_timer.clone(),
timer.elapsed().as_nanos() as f64
);
}

pub(crate) fn get_error_count(&self) {
counter!(self.prefix.clone() + ".get_error_count", 1);
counter!(self.prefixed_names.get_error_count.clone(), 1);
}

pub(crate) fn exist_count(&self) {
counter!(self.prefix.clone() + ".exist_count", 1);
counter!(self.prefixed_names.exist_count.clone(), 1);
}

pub(crate) fn exist_timer_stop(&self, timer: Timer) {
histogram!(
self.prefixed_names.exist_timer.clone(),
timer.elapsed().as_nanos() as f64
);
}

pub(crate) fn exist_error_count(&self) {
counter!(self.prefix.clone() + ".exist_error_count", 1);
counter!(self.prefixed_names.exist_error_count.clone(), 1);
}

pub(crate) fn exist_timer_stop(&self, timer: Timer) {
pub(crate) fn delete_count(&self) {
counter!(self.prefixed_names.delete_count.clone(), 1);
}

pub(crate) fn delete_timer_stop(&self, timer: Timer) {
histogram!(
self.prefix.clone() + ".exist_timer",
self.prefixed_names.delete_timer.clone(),
timer.elapsed().as_nanos() as f64
);
}

pub(crate) fn delete_error_count(&self) {
counter!(self.prefixed_names.delete_error_count.clone(), 1);
}
}

#[derive(Debug, Clone)]
struct MetricsContainer {
duration: Duration,
prefix: String,
prefixed_names: Arc<PrefixedNames>,
}

impl MetricsContainer {
pub(crate) fn new(duration: Duration, prefix: String) -> Self {
MetricsContainer { duration, prefix }
let prefixed_names = Arc::new(PrefixedNames::new(&prefix));
MetricsContainer {
duration,
prefixed_names,
}
}
}

/// A trait for generic metrics builders
pub trait ContainerBuilder {
/// Initializes `BobClient` container with given name
fn get_metrics(&self, name: &str) -> BobClient;
fn get_metrics(&self) -> BobClient;
}

impl ContainerBuilder for MetricsContainer {
fn get_metrics(&self, name: &str) -> BobClient {
let prefix = self.prefix.clone() + "." + name;
BobClient::new(prefix)
fn get_metrics(&self) -> BobClient {
BobClient::new(self.prefixed_names.clone())
}
}

Expand All @@ -207,7 +267,14 @@ pub async fn init_counters(
//install_prometheus();
//install_graphite(node_config, local_address);
let shared = install_global(node_config, local_address).await;
let container = MetricsContainer::new(Duration::from_secs(1), CLIENTS_METRICS_DIR.to_owned());
let container = MetricsContainer::new(
Duration::from_secs(1),
format!(
"{}.{}",
CLIENTS_METRICS_DIR,
local_address.replace(".", "_")
),
);
info!(
"metrics container initialized with update interval: {}ms",
container.duration.as_millis()
Expand Down
4 changes: 0 additions & 4 deletions bob-common/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ impl Node {
.expect("build uri")
}

pub fn counter_display(&self) -> String {
self.address.to_string().replace('.', "_")
}

pub async fn set_connection(&self, client: BobClient) {
*self.conn.write().await = Some(client);
}
Expand Down
12 changes: 12 additions & 0 deletions bob-grpc/proto/bob.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ service BobApi {
rpc Get (GetRequest) returns (Blob) {}
rpc Ping (Null) returns (Null) {}
rpc Exist (ExistRequest) returns (ExistResponse) {}
rpc Delete (DeleteRequest) returns (OpStatus) {}
}

message Null {};
Expand All @@ -31,6 +32,12 @@ message ExistRequest {
GetOptions options = 2; // Options, same as in get request
}

// Delete operation parameters
message DeleteRequest {
BlobKey key = 1; // Blob's key
DeleteOptions options = 2; // Options
}

// Blob id
message BlobKey {
bytes key = 1; //Inner id representation
Expand Down Expand Up @@ -79,3 +86,8 @@ message BobError {
int32 code = 1; // Error code in case of error
string desc = 2; // Error desription
}

// Delete operation options
message DeleteOptions {
bool force_node = 1; // Force operation to be served by node to which it comes
}
14 changes: 14 additions & 0 deletions bob-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ impl GetOptions {
}
}

impl DeleteOptions {
pub fn new_all() -> Self {
Self {
force_node: false,
}
}

pub fn new_local() -> Self {
Self {
force_node: true,
}
}
}

impl From<i32> for GetSource {
fn from(value: i32) -> Self {
match value {
Expand Down
Loading

0 comments on commit ad16767

Please sign in to comment.