Skip to content

Commit

Permalink
feat: Add SYNC_ENFORCE_QUOTA flag (#875)
Browse files Browse the repository at this point in the history
Adds `SYNC_ENFORCE_QUOTA` / `--enforce_quota` flag which will force an
error if a user exceeds quota.

So for the record:
    `SYNC_LIMITS__MAX_QUOTA_LIMIT` => Sets the quota limit (default 2GB)
    `SYNC_ENABLE_QUOTA` => Determine if quota calcs should be done
    `SYNC_ENFORCE_QUOTA` => Determine if over quota errors are returned
    `SYNC_LIMITS__MAX_RECORDS` => Upper max number of records we will attempt to fetch from the db.

Closes #870
  • Loading branch information
jrconlin authored Nov 9, 2020
1 parent ac763f0 commit 0e30801
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ commands:
name: quota test
command: cargo test --all --verbose
environment:
SYNC_ENABLE_QUOTA: 1
SYNC_ENFORCE_QUOTA: 1

run-e2e-tests:
steps:
Expand Down
2 changes: 1 addition & 1 deletion src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<'a> Db<'a> for MockDb {
}

#[cfg(test)]
fn set_quota(&mut self, _: bool, _: usize) {}
fn set_quota(&mut self, _: bool, _: usize, _: bool) {}
}

unsafe impl Send for MockDb {}
2 changes: 1 addition & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ pub trait Db<'a>: Debug + 'a {
fn clear_coll_cache(&self) -> DbFuture<'_, ()>;

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize);
fn set_quota(&mut self, enabled: bool, limit: usize, enforce: bool);
}

impl<'a> Clone for Box<dyn Db<'a>> {
Expand Down
47 changes: 29 additions & 18 deletions src/db/mysql/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::db::{
Db, DbFuture, Sorting,
};
use crate::server::metrics::Metrics;
use crate::settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS};
use crate::web::extractors::{BsoQueryParams, HawkIdentifier};
use crate::web::tags::Tags;

Expand All @@ -41,7 +42,8 @@ type Conn = PooledConnection<ConnectionManager<MysqlConnection>>;
/// We store the TTL as a SyncTimestamp, which is milliseconds, so remember
/// to multiply this by 1000.
pub const DEFAULT_BSO_TTL: u32 = 2_100_000_000;
pub const DEFAULT_LIMIT: i64 = 10000;
// this is the max number of records we will return.
pub static DEFAULT_LIMIT: u32 = DEFAULT_MAX_TOTAL_RECORDS;

pub const TOMBSTONE: i32 = 0;
/// SQL Variable remapping
Expand Down Expand Up @@ -89,8 +91,7 @@ pub struct MysqlDb {
coll_cache: Arc<CollectionCache>,

pub metrics: Metrics,
pub quota: usize,
pub quota_enabled: bool,
pub quota: Quota,
}

/// Despite the db conn structs being !Sync (see Arc<MysqlDbInner> above) we
Expand Down Expand Up @@ -126,8 +127,7 @@ impl MysqlDb {
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: &usize,
quota_enabled: bool,
quota: &Quota,
) -> Self {
let inner = MysqlDbInner {
#[cfg(not(test))]
Expand All @@ -141,7 +141,6 @@ impl MysqlDb {
coll_cache,
metrics: metrics.clone(),
quota: *quota,
quota_enabled,
}
}

Expand Down Expand Up @@ -401,18 +400,23 @@ impl MysqlDb {
let collection_id = self.get_or_create_collection_id(&bso.collection)?;
let user_id: u64 = bso.user_id.legacy_id;
let timestamp = self.timestamp().as_i64();
if self.quota_enabled {
if self.quota.enabled {
let usage = self.get_quota_usage_sync(params::GetQuotaUsage {
user_id: HawkIdentifier::new_legacy(user_id),
collection: bso.collection.clone(),
collection_id,
})?;
if usage.total_bytes >= self.quota as usize {
if usage.total_bytes >= self.quota.size as usize {
let mut tags = Tags::default();
tags.tags.insert("collection".to_owned(), bso.collection);
tags.tags
.insert("collection".to_owned(), bso.collection.clone());
self.metrics
.incr_with_tags("storage.quota.at_limit", Some(tags));
return Err(DbErrorKind::Quota.into());
if self.quota.enforced {
return Err(DbErrorKind::Quota.into());
} else {
warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone());
}
}
}

Expand Down Expand Up @@ -514,6 +518,10 @@ impl MysqlDb {
query = query.filter(bso::id.eq_any(ids));
}

// it's possible for two BSOs to be inserted with the same `modified` date,
// since there's no guarantee of order when doing a get, pagination can return
// an error. We "fudge" a bit here by taking the id order as a secondary, since
// that is guaranteed to be unique by the client.
query = match sort {
// issue559: Revert to previous sorting
/*
Expand All @@ -524,12 +532,12 @@ impl MysqlDb {
Sorting::Oldest => query.order(bso::id.asc()).order(bso::modified.asc()),
*/
Sorting::Index => query.order(bso::sortindex.desc()),
Sorting::Newest => query.order(bso::modified.desc()),
Sorting::Oldest => query.order(bso::modified.asc()),
Sorting::Newest => query.order((bso::modified.desc(), bso::id.desc())),
Sorting::Oldest => query.order((bso::modified.asc(), bso::id.asc())),
_ => query,
};

let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT).max(0);
let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT as i64).max(0);
// fetch an extra row to detect if there are more rows that
// match the query conditions
query = query.limit(if limit > 0 { limit + 1 } else { limit });
Expand Down Expand Up @@ -606,7 +614,7 @@ impl MysqlDb {
};

// negative limits are no longer allowed by mysql.
let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT).max(0);
let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT as i64).max(0);
// fetch an extra row to detect if there are more rows that
// match the query conditions. Negative limits will cause an error.
query = query.limit(if limit == 0 { limit } else { limit + 1 });
Expand Down Expand Up @@ -838,7 +846,7 @@ impl MysqlDb {
user_id: u32,
collection_id: i32,
) -> Result<SyncTimestamp> {
let quota = if self.quota_enabled {
let quota = if self.quota.enabled {
self.calc_quota_usage_sync(user_id, collection_id)?
} else {
results::GetQuotaUsage {
Expand Down Expand Up @@ -1130,9 +1138,12 @@ impl<'a> Db<'a> for MysqlDb {
}

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize) {
self.quota = limit;
self.quota_enabled = enabled;
fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) {
self.quota = Quota {
size: limit,
enabled,
enforced,
}
}
}

Expand Down
13 changes: 7 additions & 6 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::db::{
};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
use crate::settings::{Quota, Settings};

embed_migrations!();

Expand Down Expand Up @@ -54,8 +54,7 @@ pub struct MysqlDbPool {
coll_cache: Arc<CollectionCache>,

metrics: Metrics,
quota: usize,
quota_enabled: bool,
quota: Quota,
}

impl MysqlDbPool {
Expand Down Expand Up @@ -84,8 +83,11 @@ impl MysqlDbPool {
pool: builder.build(manager)?,
coll_cache: Default::default(),
metrics: metrics.clone(),
quota: settings.limits.max_quota_limit as usize,
quota_enabled: settings.enable_quota,
quota: Quota {
size: settings.limits.max_quota_limit as usize,
enabled: settings.enable_quota,
enforced: settings.enforce_quota,
},
})
}

Expand All @@ -95,7 +97,6 @@ impl MysqlDbPool {
Arc::clone(&self.coll_cache),
&self.metrics,
&self.quota,
self.quota_enabled,
))
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub async fn commit_async(
.await?;
// XXX: returning results::PostBsos here isn't needed
// update the quotas for the user's collection
if db.quota_enabled {
if db.quota.enabled {
db.update_user_collection_quotas(&params.user_id, collection_id)
.await?;
}
Expand Down Expand Up @@ -396,10 +396,14 @@ pub async fn do_append_async(
);
}

if db.quota_enabled {
if db.quota.enabled {
if let Some(size) = batch.size {
if size + running_size >= (db.quota as usize) {
return Err(db.quota_error(collection));
if size + running_size >= db.quota.size {
if db.quota.enforced {
return Err(db.quota_error(collection));
} else {
warn!("Quota at limit for user's collection ({} bytes)", size + running_size; "collection"=>collection);
}
}
}
}
Expand Down Expand Up @@ -546,7 +550,7 @@ async fn pretouch_collection_async(
.await?;
if result.is_none() {
sqlparams.insert("modified".to_owned(), as_value(PRETOUCH_TS.to_owned()));
let sql = if db.quota_enabled {
let sql = if db.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, count, total_bytes)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand Down
41 changes: 23 additions & 18 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID,
},
server::metrics::Metrics,
settings::Quota,
web::{
extractors::{BsoQueryParams, HawkIdentifier, Offset},
tags::Tags,
Expand Down Expand Up @@ -92,8 +93,7 @@ pub struct SpannerDb {
coll_cache: Arc<CollectionCache>,

pub metrics: Metrics,
pub quota: usize,
pub quota_enabled: bool,
pub quota: Quota,
}

pub struct SpannerDbInner {
Expand Down Expand Up @@ -121,8 +121,7 @@ impl SpannerDb {
conn: Conn,
coll_cache: Arc<CollectionCache>,
metrics: &Metrics,
quota: usize,
quota_enabled: bool,
quota: Quota,
) -> Self {
let inner = SpannerDbInner {
conn,
Expand All @@ -133,7 +132,6 @@ impl SpannerDb {
coll_cache,
metrics: metrics.clone(),
quota,
quota_enabled,
}
}

Expand Down Expand Up @@ -800,7 +798,7 @@ impl SpannerDb {
&self,
params: params::GetQuotaUsage,
) -> Result<results::GetQuotaUsage> {
if !self.quota_enabled {
if !self.quota.enabled {
return Ok(results::GetQuotaUsage::default());
}
let check_sql = "SELECT COALESCE(total_bytes,0), COALESCE(count,0)
Expand All @@ -819,7 +817,7 @@ impl SpannerDb {
.one_or_none()
.await?;
if let Some(result) = result {
let total_bytes = if self.quota_enabled {
let total_bytes = if self.quota.enabled {
result[0]
.get_string_value()
.parse::<usize>()
Expand Down Expand Up @@ -861,7 +859,7 @@ impl SpannerDb {
self.metrics
.clone()
.start_timer("storage.quota.update_existing_totals", None);
let calc_sql = if self.quota_enabled {
let calc_sql = if self.quota.enabled {
"SELECT SUM(BYTE_LENGTH(payload)), COUNT(*)
FROM bsos
WHERE fxa_uid = @fxa_uid
Expand Down Expand Up @@ -890,7 +888,7 @@ impl SpannerDb {
// Update the user_collections table to reflect current numbers.
// If there are BSOs, there are user_collections (or else something
// really bad already happened.)
if self.quota_enabled {
if self.quota.enabled {
sqlparams.insert(
"total_bytes".to_owned(),
as_value(result[0].take_string_value()),
Expand Down Expand Up @@ -933,7 +931,7 @@ impl SpannerDb {
.await?;
if result.is_none() {
// No collections, so insert what we've got.
if self.quota_enabled {
if self.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, total_bytes, count)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand All @@ -943,7 +941,7 @@ impl SpannerDb {
} else {
// there are collections, best modify what's there.
// NOTE, tombstone is a single collection id, it would have been created above.
if self.quota_enabled {
if self.quota.enabled {
"UPDATE user_collections SET modified=@modified, total_bytes=0, count=0
WHERE fxa_uid=@fxa_uid AND fxa_kid=@fxa_kid AND collection_id=@collection_id"
} else {
Expand Down Expand Up @@ -1118,7 +1116,7 @@ impl SpannerDb {
self.metrics
.clone()
.start_timer("storage.quota.init_totals", Some(tags));
let update_sql = if self.quota_enabled {
let update_sql = if self.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, count, total_bytes)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
} else {
Expand Down Expand Up @@ -1634,7 +1632,7 @@ impl SpannerDb {
collection_id: i32,
) -> Result<Option<usize>> {
// duplicate quota trap in test func below.
if !self.quota_enabled {
if !self.quota.enabled {
return Ok(None);
}
let usage = self
Expand All @@ -1644,8 +1642,12 @@ impl SpannerDb {
collection_id,
})
.await?;
if usage.total_bytes >= self.quota {
return Err(self.quota_error(collection));
if usage.total_bytes >= self.quota.size {
if self.quota.enforced {
return Err(self.quota_error(collection));
} else {
warn!("Quota at limit for user's collection: ({} bytes)", usage.total_bytes; "collection"=>collection);
}
}
Ok(Some(usage.total_bytes as usize))
}
Expand Down Expand Up @@ -2104,8 +2106,11 @@ impl<'a> Db<'a> for SpannerDb {
}

#[cfg(test)]
fn set_quota(&mut self, enabled: bool, limit: usize) {
self.quota_enabled = enabled;
self.quota = limit;
fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) {
self.quota = Quota {
size: limit,
enabled,
enforced,
};
}
}
Loading

0 comments on commit 0e30801

Please sign in to comment.