Skip to content

Commit

Permalink
feat: Add SYNC_ENFORCE_QUOTA flag
Browse files Browse the repository at this point in the history
Adds `SYNC_ENFORCE_QUOTA` / `--enforce_quota` flag which will force an
error if a user exceeds quota.

So for the record:
    `SYNC_LIMITS__MAX_QUOTA_LIMIT` => Sets the quota limit (default 2GB)
    `SYNC_ENABLE_QUOTA` => Determine if quota calcs should be done
    `SYNC_ENFORCE_QUOTA` => Determine if over quota errors are returned

Closes #870
  • Loading branch information
jrconlin committed Oct 26, 2020
1 parent c6ea474 commit f7979a7
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'a> Db<'a> for MockDb {
fn clear_coll_cache(&self) {}

#[cfg(test)]
fn set_quota(&mut self, _: bool, _: usize) {}
fn set_quota(&mut self, _: bool, _: usize, _: bool) {}
}

unsafe impl Send for MockDb {}
2 changes: 1 addition & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub trait Db<'a>: Debug + 'a {
fn clear_coll_cache(&self);

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize);
fn set_quota(&mut self, enabled: bool, limit: usize, enforce: bool);
}

impl<'a> Clone for Box<dyn Db<'a>> {
Expand Down
32 changes: 19 additions & 13 deletions src/db/mysql/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::db::{
Db, DbFuture, Sorting,
};
use crate::server::metrics::Metrics;
use crate::settings::Quota;
use crate::web::extractors::{BsoQueryParams, HawkIdentifier};
use crate::web::tags::Tags;

Expand Down Expand Up @@ -86,8 +87,7 @@ pub struct MysqlDb {
coll_cache: Arc<CollectionCache>,

pub metrics: Metrics,
pub quota: usize,
pub quota_enabled: bool,
pub quota: Quota,
}

/// Despite the db conn structs being !Sync (see Arc<MysqlDbInner> above) we
Expand Down Expand Up @@ -123,8 +123,7 @@ impl MysqlDb {
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: &usize,
quota_enabled: bool,
quota: &Quota,
) -> Self {
let inner = MysqlDbInner {
#[cfg(not(test))]
Expand All @@ -138,7 +137,6 @@ impl MysqlDb {
coll_cache,
metrics: metrics.clone(),
quota: *quota,
quota_enabled,
}
}

Expand Down Expand Up @@ -398,18 +396,23 @@ impl MysqlDb {
let collection_id = self.get_or_create_collection_id(&bso.collection)?;
let user_id: u64 = bso.user_id.legacy_id;
let timestamp = self.timestamp().as_i64();
if self.quota_enabled {
if self.quota.enabled {
let usage = self.get_quota_usage_sync(params::GetQuotaUsage {
user_id: HawkIdentifier::new_legacy(user_id),
collection: bso.collection.clone(),
collection_id,
})?;
if usage.total_bytes >= self.quota as usize {
if usage.total_bytes >= self.quota.size as usize {
let mut tags = Tags::default();
tags.tags.insert("collection".to_owned(), bso.collection);
tags.tags
.insert("collection".to_owned(), bso.collection.clone());
self.metrics
.incr_with_tags("storage.quota.at_limit", Some(tags));
return Err(DbErrorKind::Quota.into());
if self.quota.enforced {
return Err(DbErrorKind::Quota.into());
} else {
warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
}
}
}

Expand Down Expand Up @@ -829,7 +832,7 @@ impl MysqlDb {
user_id: u32,
collection_id: i32,
) -> Result<SyncTimestamp> {
let quota = if self.quota_enabled {
let quota = if self.quota.enabled {
self.calc_quota_usage_sync(user_id, collection_id)?
} else {
results::GetQuotaUsage {
Expand Down Expand Up @@ -1114,9 +1117,12 @@ impl<'a> Db<'a> for MysqlDb {
}

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize) {
self.quota = limit;
self.quota_enabled = enabled;
fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) {
self.quota = Quota {
size: limit,
enabled,
enforced,
}
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::db::{
};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
use crate::settings::{Quota, Settings};

embed_migrations!();

Expand Down Expand Up @@ -54,8 +54,7 @@ pub struct MysqlDbPool {
coll_cache: Arc<CollectionCache>,

metrics: Metrics,
quota: usize,
quota_enabled: bool,
quota: Quota,
}

impl MysqlDbPool {
Expand Down Expand Up @@ -84,8 +83,11 @@ impl MysqlDbPool {
pool: builder.build(manager)?,
coll_cache: Default::default(),
metrics: metrics.clone(),
quota: settings.limits.max_quota_limit as usize,
quota_enabled: settings.enable_quota,
quota: Quota {
size: settings.limits.max_quota_limit as usize,
enabled: settings.enable_quota,
enforced: settings.enforce_quota,
},
})
}

Expand All @@ -95,7 +97,6 @@ impl MysqlDbPool {
Arc::clone(&self.coll_cache),
&self.metrics,
&self.quota,
self.quota_enabled,
))
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,14 @@ pub async fn do_append_async(
};
}

if db.quota_enabled {
if db.quota.enabled {
if let Some(size) = batch.size {
if size + running_size >= (db.quota as usize) {
return Err(db.quota_error(collection));
if size + running_size >= db.quota.size {
if db.quota.enforced {
return Err(db.quota_error(collection));
} else {
warn!("Quota at limit for user's collection ({} bytes)", size + running_size; "collection"=>collection);
}
}
}
}
Expand Down Expand Up @@ -510,7 +514,7 @@ async fn pretouch_collection_async(
.await?;
if result.is_none() {
sqlparams.insert("modified".to_owned(), as_value(PRETOUCH_TS.to_owned()));
let sql = if db.quota_enabled {
let sql = if db.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, count, total_bytes)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand Down
41 changes: 23 additions & 18 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID,
},
server::metrics::Metrics,
settings::Quota,
web::{
extractors::{BsoQueryParams, HawkIdentifier, Offset},
tags::Tags,
Expand Down Expand Up @@ -92,8 +93,7 @@ pub struct SpannerDb {
coll_cache: Arc<CollectionCache>,

pub metrics: Metrics,
pub quota: usize,
pub quota_enabled: bool,
pub quota: Quota,
}

pub struct SpannerDbInner {
Expand Down Expand Up @@ -121,8 +121,7 @@ impl SpannerDb {
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: usize,
quota_enabled: bool,
quota: Quota,
) -> Self {
let inner = SpannerDbInner {
conn,
Expand All @@ -133,7 +132,6 @@ impl SpannerDb {
coll_cache,
metrics: metrics.clone(),
quota,
quota_enabled,
}
}

Expand Down Expand Up @@ -801,7 +799,7 @@ impl SpannerDb {
&self,
params: params::GetQuotaUsage,
) -> Result<results::GetQuotaUsage> {
if !self.quota_enabled {
if !self.quota.enabled {
return Ok(results::GetQuotaUsage::default());
}
let check_sql = "SELECT COALESCE(total_bytes,0), COALESCE(count,0)
Expand All @@ -820,7 +818,7 @@ impl SpannerDb {
.one_or_none()
.await?;
if let Some(result) = result {
let total_bytes = if self.quota_enabled {
let total_bytes = if self.quota.enabled {
result[0]
.get_string_value()
.parse::<usize>()
Expand Down Expand Up @@ -862,7 +860,7 @@ impl SpannerDb {
self.metrics
.clone()
.start_timer("storage.quota.update_existing_totals", None);
let calc_sql = if self.quota_enabled {
let calc_sql = if self.quota.enabled {
"SELECT SUM(BYTE_LENGTH(payload)), COUNT(*)
FROM bsos
WHERE fxa_uid = @fxa_uid
Expand Down Expand Up @@ -891,7 +889,7 @@ impl SpannerDb {
// Update the user_collections table to reflect current numbers.
// If there are BSOs, there are user_collections (or else something
// really bad already happened.)
if self.quota_enabled {
if self.quota.enabled {
sqlparams.insert(
"total_bytes".to_owned(),
as_value(result[0].take_string_value()),
Expand Down Expand Up @@ -934,7 +932,7 @@ impl SpannerDb {
.await?;
if result.is_none() {
// No collections, so insert what we've got.
if self.quota_enabled {
if self.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, total_bytes, count)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand All @@ -944,7 +942,7 @@ impl SpannerDb {
} else {
// there are collections, best modify what's there.
// NOTE, tombstone is a single collection id, it would have been created above.
if self.quota_enabled {
if self.quota.enabled {
"UPDATE user_collections SET modified=@modified, total_bytes=0, count=0
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id"
} else {
Expand Down Expand Up @@ -1119,7 +1117,7 @@ impl SpannerDb {
self.metrics
.clone()
.start_timer("storage.quota.init_totals", Some(tags));
let update_sql = if self.quota_enabled {
let update_sql = if self.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, count, total_bytes)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand Down Expand Up @@ -1630,7 +1628,7 @@ impl SpannerDb {
collection_id: i32,
) -> Result<Option<usize>> {
// duplicate quota trap in test func below.
if !self.quota_enabled {
if !self.quota.enabled {
return Ok(None);
}
let usage = self
Expand All @@ -1640,8 +1638,12 @@ impl SpannerDb {
collection_id,
})
.await?;
if usage.total_bytes >= self.quota {
return Err(self.quota_error(collection));
if usage.total_bytes >= self.quota.size {
if self.quota.enforced {
return Err(self.quota_error(collection));
} else {
warn!("Quota at limit for user's collection: ({} bytes)", usage.total_bytes; "collection"=>collection);
}
}
Ok(Some(usage.total_bytes as usize))
}
Expand Down Expand Up @@ -2096,8 +2098,11 @@ impl<'a> Db<'a> for SpannerDb {
}

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize) {
self.quota_enabled = enabled;
self.quota = limit;
fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) {
self.quota = Quota {
size: limit,
enabled,
enforced,
};
}
}
13 changes: 7 additions & 6 deletions src/db/spanner/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use super::models::Result;
use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
use crate::settings::{Quota, Settings};

use super::manager::{SpannerSession, SpannerSessionManager};
use super::models::SpannerDb;
Expand All @@ -37,8 +37,7 @@ pub struct SpannerDbPool {
coll_cache: Arc<CollectionCache>,

metrics: Metrics,
quota: usize,
quota_enabled: bool,
quota: Quota,
}

impl SpannerDbPool {
Expand All @@ -58,8 +57,11 @@ impl SpannerDbPool {
pool,
coll_cache: Default::default(),
metrics: metrics.clone(),
quota: settings.limits.max_quota_limit as usize,
quota_enabled: settings.enable_quota,
quota: Quota {
size: settings.limits.max_quota_limit as usize,
enabled: settings.enable_quota,
enforced: settings.enforce_quota,
},
})
}

Expand All @@ -75,7 +77,6 @@ impl SpannerDbPool {
Arc::clone(&self.coll_cache),
&self.metrics,
self.quota,
self.quota_enabled,
))
}
}
Expand Down
Loading

0 comments on commit f7979a7

Please sign in to comment.