diff --git a/.circleci/config.yml b/.circleci/config.yml index 52cf766ac3..575a9fbc60 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -43,7 +43,7 @@ commands: - run: name: Core Python Checks command: | - flake8 src/tokenserver + flake8 syncstorage/src/tokenserver flake8 tools/integration_tests flake8 tools/tokenserver rust-clippy: diff --git a/Cargo.lock b/Cargo.lock index 60ec085d78..0ff652d5f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,17 +277,6 @@ dependencies = [ "syn", ] -[[package]] -name = "actix-web-httpauth" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3b11a07a3df3f7970fd8bd38cc66998b5549f507c54cc64c6e843bc82d6358" -dependencies = [ - "actix-web", - "base64 0.13.0", - "futures-util", -] - [[package]] name = "addr2line" version = "0.17.0" @@ -3018,7 +3007,6 @@ dependencies = [ "actix-http", "actix-rt", "actix-web", - "actix-web-httpauth", "async-trait", "backtrace", "base64 0.13.0", @@ -3067,8 +3055,11 @@ dependencies = [ "slog-scope", "slog-stdlog", "slog-term", + "syncstorage-common", + "syncstorage-db-common", "thiserror", "time 0.3.9", + "tokenserver-common", "tokio", "url 2.2.2", "urlencoding", @@ -3078,6 +3069,31 @@ dependencies = [ "woothee", ] +[[package]] +name = "syncstorage-common" +version = "0.10.2" + +[[package]] +name = "syncstorage-db-common" +version = "0.10.2" +dependencies = [ + "async-trait", + "backtrace", + "chrono", + "deadpool", + "diesel", + "diesel_migrations", + "futures 0.3.19", + "grpcio", + "hostname", + "http", + "lazy_static", + "serde 1.0.135", + "serde_json", + "syncstorage-common", + "thiserror", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -3249,6 +3265,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokenserver-common" +version = "0.10.2" +dependencies = [ + "actix-web", + "serde 1.0.135", + "serde_json", + "thiserror", +] + [[package]] name = "tokio" version = "0.2.25" diff --git a/Cargo.toml b/Cargo.toml index 4c172ea0d1..92d47af867 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,94 +1,13 @@ -[package] -name = "syncstorage" -version = "0.10.2" -license = "MPL-2.0" -authors = [ - "Ben Bangert ", - "Phil Jenvey ", - "Mozilla Services Engineering " +[workspace] +resolver = "2" +members = [ + "syncstorage-db-common", + "syncstorage-common", + "tokenserver-common", + "syncstorage", ] -edition = "2018" -default-run = "syncstorage" +default-members = ["syncstorage"] [profile.release] -# Enables line numbers in Sentry +# Enables line numbers in Sentry reporting debug = 1 - -[dependencies] -actix-http = "2" -actix-web = "3" -actix-web-httpauth = "0.5" -actix-rt = "1" # Pin to 1.0, due to dependencies on Tokio -actix-cors = "0.5" -async-trait = "0.1.40" -backtrace = "0.3.61" -base64 = "0.13" -bb8 = "0.4.1" # pin to 0.4 due to dependencies on Tokio -bytes = "1.0" -cadence = "0.26" -chrono = "0.4" -config = "0.11" -# Pin to 0.5 for now, to keep it under tokio 0.2 (issue977). -# Fix for #803 (deadpool#92) points to our fork for now -#deadpool = "0.5" # pin to 0.5 -deadpool = { git = "https://github.com/mozilla-services/deadpool", branch = "deadpool-v0.5.2-issue92" } -diesel = { version = "1.4", features = ["mysql", "r2d2"] } -diesel_logger = "0.1.1" -diesel_migrations = { version = "1.4.0", features = ["mysql"] } -docopt = "1.1.0" -dyn-clone = "1.0.4" -env_logger = "0.9" -futures = { version = "0.3", features = ["compat"] } -google-cloud-rust-raw = "0.11.0" -# Some versions of OpenSSL 1.1.1 conflict with grpcio's built-in boringssl which can cause -# syncserver to either fail to either compile, or start. In those cases, try -# `cargo build --features grpcio/openssl ...` -grpcio = { version = "0.9" } -lazy_static = "1.4.0" -hawk = "3.2" -hex = "0.4.3" -hostname = "0.3.1" -hkdf = "0.11" -hmac = "0.11" -http = "0.2.5" -log = { version = "0.4", features = ["max_level_debug", "release_max_level_info"] } -mime = "0.3" -mockito = "0.30.0" -num_cpus = "1" -# must match what's used by googleapis-raw -protobuf = "2.20.0" -pyo3 = { version = "0.14", features = ["auto-initialize"] } -rand = "0.8" -regex = "1.4" -reqwest = { version = "0.10.10", features = ["json", "rustls-tls"] } -# pin to 0.19: https://github.com/getsentry/sentry-rust/issues/277 -sentry = { version = "0.19", features = ["with_curl_transport"] }# pin to 0.19 until on-prem sentry server is updated -sentry-backtrace = "0.19" -serde = "1.0" -serde_derive = "1.0" -serde_json = { version = "1.0", features = ["arbitrary_precision"] } -scheduled-thread-pool = "0.2" -sha2 = "0.9" -slog = { version = "2.5", features = ["max_level_info", "release_max_level_info", "dynamic-keys"] } -slog-async = "2.5" -slog-envlogger = "2.2.0" -slog-mozlog-json = "0.1" -slog-scope = "4.3" -slog-stdlog = "4.1" -slog-term = "2.6" -thiserror = "1.0.26" -time = "^0.3" -# pinning to 0.2.4 due to high number of dependencies (actix, bb8, deadpool, etc.) -tokio = { version = "0.2.4", features = ["macros", "sync"] } -url = "2.1" -urlencoding = "2.1" -uuid = { version = "0.8.2", features = ["serde", "v4"] } -validator = "0.14" -validator_derive = "0.14" -woothee = "0.11" - -[features] -no_auth = [] - -[[bin]] -name = "purge_ttl" diff --git a/Dockerfile b/Dockerfile index 991a2ecdc5..88eb610737 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,8 +11,8 @@ RUN apt-get -q update && \ RUN \ cargo --version && \ rustc --version && \ - cargo install --path . --locked --root /app && \ - cargo install --path . --locked --root /app --bin purge_ttl + cargo install --path ./syncstorage --locked --root /app && \ + cargo install --path ./syncstorage --locked --root /app --bin purge_ttl FROM debian:buster-slim WORKDIR /app @@ -40,7 +40,7 @@ COPY --from=builder /app/spanner_config.ini /app COPY --from=builder /app/tools/spanner /app/tools/spanner COPY --from=builder /app/tools/integration_tests /app/tools/integration_tests COPY --from=builder /app/scripts/prepare-spanner.sh /app/scripts/prepare-spanner.sh -COPY --from=builder /app/src/db/spanner/schema.ddl /app/schema.ddl +COPY --from=builder /app/syncstorage/src/db/spanner/schema.ddl /app/schema.ddl RUN chmod +x /app/scripts/prepare-spanner.sh RUN pip3 install -r /app/tools/integration_tests/requirements.txt diff --git a/syncstorage-common/Cargo.toml b/syncstorage-common/Cargo.toml new file mode 100644 index 0000000000..c25f0cbdf7 --- /dev/null +++ b/syncstorage-common/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "syncstorage-common" +version = "0.10.2" +edition = "2021" + +[dependencies] diff --git a/syncstorage-common/src/lib.rs b/syncstorage-common/src/lib.rs new file mode 100644 index 0000000000..3ca04e8513 --- /dev/null +++ b/syncstorage-common/src/lib.rs @@ -0,0 +1,21 @@ +#[macro_export] +macro_rules! from_error { + ($from:ty, $to:ty, $to_kind:expr) => { + impl From<$from> for $to { + fn from(inner: $from) -> $to { + $to_kind(inner).into() + } + } + }; +} + +#[macro_export] +macro_rules! impl_fmt_display { + ($error:ty, $kind:ty) => { + impl fmt::Display for $error { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.kind, formatter) + } + } + }; +} diff --git a/syncstorage-db-common/Cargo.toml b/syncstorage-db-common/Cargo.toml new file mode 100644 index 0000000000..a5b1530262 --- /dev/null +++ b/syncstorage-db-common/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "syncstorage-db-common" +version = "0.10.2" +edition = "2021" + +[dependencies] +async-trait = "0.1.40" +backtrace = "0.3.61" +chrono = "0.4" +# Pin to 0.5 for now, to keep it under tokio 0.2 (issue977). +# Fix for #803 (deadpool#92) points to our fork for now +#deadpool = "0.5" # pin to 0.5 +deadpool = { git = "https://github.com/mozilla-services/deadpool", branch = "deadpool-v0.5.2-issue92" } +diesel = { version = "1.4", features = ["mysql", "r2d2"] } +diesel_migrations = { version = "1.4.0", features = ["mysql"] } +# Some versions of OpenSSL 1.1.1 conflict with grpcio's built-in boringssl which can cause +# syncserver to either fail to either compile, or start. In those cases, try +# `cargo build --features grpcio/openssl ...` +grpcio = { version = "0.9" } +hostname = "0.3.1" +http = "0.2.6" +futures = { version = "0.3", features = ["compat"] } +lazy_static = "1.4.0" +serde = "1.0" +serde_json = { version = "1.0", features = ["arbitrary_precision"] } +syncstorage-common = { path = "../syncstorage-common" } +thiserror = "1.0.26" diff --git a/src/db/error.rs b/syncstorage-db-common/src/error.rs similarity index 81% rename from src/db/error.rs rename to syncstorage-db-common/src/error.rs index 595309818f..f1fe90fc6f 100644 --- a/src/db/error.rs +++ b/syncstorage-db-common/src/error.rs @@ -1,12 +1,15 @@ use std::fmt; -use actix_web::http::StatusCode; +use backtrace::Backtrace; +use http::StatusCode; +use syncstorage_common::{from_error, impl_fmt_display}; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Debug)] pub struct DbError { kind: DbErrorKind, pub status: StatusCode, + pub backtrace: Backtrace, } #[derive(Debug, Error)] @@ -38,9 +41,6 @@ pub enum DbErrorKind { #[error("Specified batch does not exist")] BatchNotFound, - #[error("Tokenserver user retired")] - TokenserverUserRetired, - #[error("An attempt at a conflicting write")] Conflict, @@ -61,10 +61,6 @@ pub enum DbErrorKind { } impl DbError { - pub fn kind(&self) -> &DbErrorKind { - &self.kind - } - pub fn internal(msg: &str) -> Self { DbErrorKind::Internal(msg.to_owned()).into() } @@ -79,6 +75,26 @@ impl DbError { _ => None, } } + + pub fn is_collection_not_found(&self) -> bool { + matches!(self.kind, DbErrorKind::CollectionNotFound) + } + + pub fn is_conflict(&self) -> bool { + matches!(self.kind, DbErrorKind::Conflict) + } + + pub fn is_quota(&self) -> bool { + matches!(self.kind, DbErrorKind::Quota) + } + + pub fn is_bso_not_found(&self) -> bool { + matches!(self.kind, DbErrorKind::BsoNotFound) + } + + pub fn is_batch_not_found(&self) -> bool { + matches!(self.kind, DbErrorKind::BatchNotFound) + } } impl From for DbError { @@ -94,12 +110,14 @@ impl From for DbError { // * android bug: https://bugzilla.mozilla.org/show_bug.cgi?id=959032 DbErrorKind::Conflict => StatusCode::SERVICE_UNAVAILABLE, DbErrorKind::Quota => StatusCode::FORBIDDEN, - // NOTE: TokenserverUserRetired is an internal service error for compatibility reasons - // (the legacy Tokenserver returned an internal service error in this situation) _ => StatusCode::INTERNAL_SERVER_ERROR, }; - Self { kind, status } + Self { + kind, + status, + backtrace: Backtrace::new(), + } } } diff --git a/src/db/mod.rs b/syncstorage-db-common/src/lib.rs similarity index 75% rename from src/db/mod.rs rename to syncstorage-db-common/src/lib.rs index f8b240f91a..82cf4aa3bb 100644 --- a/src/db/mod.rs +++ b/syncstorage-db-common/src/lib.rs @@ -1,38 +1,24 @@ -//! Generic db abstration. - pub mod error; -pub mod mock; -pub mod mysql; pub mod params; pub mod results; -pub mod spanner; -#[cfg(test)] -mod tests; -pub mod transaction; pub mod util; -use std::{fmt::Debug, time::Duration}; +use std::fmt::Debug; use async_trait::async_trait; -use cadence::{Gauged, StatsdClient}; use futures::future::{self, LocalBoxFuture, TryFutureExt}; use lazy_static::lazy_static; use serde::Deserialize; -use url::Url; -pub use self::error::{DbError, DbErrorKind}; -use self::util::SyncTimestamp; -use crate::error::{ApiError, ApiResult}; -use crate::server::metrics::Metrics; -use crate::settings::Settings; -use crate::web::extractors::HawkIdentifier; +use error::DbError; +use util::SyncTimestamp; lazy_static! { /// For efficiency, it's possible to use fixed pre-determined IDs for /// common collection names. This is the canonical list of such /// names. Non-standard collections will be allocated IDs starting /// from the highest ID in this collection. - static ref STD_COLLS: Vec<(i32, &'static str)> = { + pub static ref STD_COLLS: Vec<(i32, &'static str)> = { vec![ (1, "clients"), (2, "crypto"), @@ -51,17 +37,20 @@ lazy_static! { }; } -/// Non-standard collections will be allocated IDs beginning with this value -pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101; - /// Rough guesstimate of the maximum reasonable life span of a batch pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds -type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; +/// The ttl to use for rows that are never supposed to expire (in seconds) +pub const DEFAULT_BSO_TTL: u32 = 2_100_000_000; + +/// Non-standard collections will be allocated IDs beginning with this value +pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101; + +pub type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; #[async_trait] pub trait DbPool: Sync + Send + Debug + GetPoolState { - async fn get(&self) -> ApiResult>>; + async fn get(&self) -> Result>, DbError>; fn validate_batch_id(&self, params: params::ValidateBatchId) -> Result<(), DbError>; @@ -91,6 +80,23 @@ pub struct PoolState { pub idle_connections: u32, } +impl From for PoolState { + fn from(state: diesel::r2d2::State) -> PoolState { + PoolState { + connections: state.connections, + idle_connections: state.idle_connections, + } + } +} +impl From for PoolState { + fn from(status: deadpool::Status) -> PoolState { + PoolState { + connections: status.size as u32, + idle_connections: status.available.max(0) as u32, + } + } +} + pub trait Db<'a>: Debug + 'a { fn lock_for_read(&self, params: params::LockCollection) -> DbFuture<'_, ()>; @@ -189,7 +195,7 @@ pub trait Db<'a>: Debug + 'a { /// Modeled on the Python `get_resource_timestamp` function. fn extract_resource( &self, - user_id: HawkIdentifier, + user_id: UserIdentifier, collection: Option, bso: Option, ) -> DbFuture<'_, SyncTimestamp> { @@ -237,25 +243,18 @@ pub trait Db<'a>: Debug + 'a { fn get_collection_id(&self, name: String) -> DbFuture<'_, i32>; - #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32>; - #[cfg(test)] fn update_collection(&self, params: params::UpdateCollection) -> DbFuture<'_, SyncTimestamp>; - #[cfg(test)] fn timestamp(&self) -> SyncTimestamp; - #[cfg(test)] fn set_timestamp(&self, timestamp: SyncTimestamp); - #[cfg(test)] fn delete_batch(&self, params: params::DeleteBatch) -> DbFuture<'_, ()>; - #[cfg(test)] fn clear_coll_cache(&self) -> DbFuture<'_, ()>; - #[cfg(test)] fn set_quota(&mut self, enabled: bool, limit: usize, enforce: bool); } @@ -280,49 +279,30 @@ impl Default for Sorting { } } -/// Create/initialize a pool of managed Db connections -pub async fn pool_from_settings( - settings: &Settings, - metrics: &Metrics, -) -> Result, DbError> { - let url = - Url::parse(&settings.database_url).map_err(|e| DbErrorKind::InvalidUrl(e.to_string()))?; - Ok(match url.scheme() { - "mysql" => Box::new(mysql::pool::MysqlDbPool::new(settings, metrics)?), - "spanner" => Box::new(spanner::pool::SpannerDbPool::new(settings, metrics).await?), - _ => Err(DbErrorKind::InvalidUrl(settings.database_url.to_owned()))?, - }) +#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)] +pub struct UserIdentifier { + /// For MySQL database backends as the primary key + pub legacy_id: u64, + /// For NoSQL database backends that require randomly distributed primary keys + pub fxa_uid: String, + pub fxa_kid: String, } -/// Emit DbPool metrics periodically -pub fn spawn_pool_periodic_reporter( - interval: Duration, - metrics: StatsdClient, - pool: T, -) -> Result<(), DbError> { - let hostname = hostname::get() - .expect("Couldn't get hostname") - .into_string() - .expect("Couldn't get hostname"); - actix_rt::spawn(async move { - loop { - let PoolState { - connections, - idle_connections, - } = pool.state(); - metrics - .gauge_with_tags( - "storage.pool.connections.active", - (connections - idle_connections) as u64, - ) - .with_tag("hostname", &hostname) - .send(); - metrics - .gauge_with_tags("storage.pool.connections.idle", idle_connections as u64) - .with_tag("hostname", &hostname) - .send(); - actix_rt::time::delay_for(interval).await; +impl UserIdentifier { + /// Create a new legacy id user identifier + pub fn new_legacy(user_id: u64) -> Self { + Self { + legacy_id: user_id, + ..Default::default() } - }); - Ok(()) + } +} + +impl From for UserIdentifier { + fn from(val: u32) -> Self { + Self { + legacy_id: val.into(), + ..Default::default() + } + } } diff --git a/src/db/params.rs b/syncstorage-db-common/src/params.rs similarity index 56% rename from src/db/params.rs rename to syncstorage-db-common/src/params.rs index 8619fd5c41..abd142079b 100644 --- a/src/db/params.rs +++ b/syncstorage-db-common/src/params.rs @@ -1,10 +1,10 @@ //! Parameter types for database methods. -use std::collections::HashMap; +use std::{collections::HashMap, num::ParseIntError, str::FromStr}; +use diesel::Queryable; use serde::{Deserialize, Serialize}; -use crate::db::results; -use crate::web::extractors::{BatchBsoBody, BsoQueryParams, HawkIdentifier}; +use crate::{results, util::SyncTimestamp, Sorting, UserIdentifier}; macro_rules! data { ($name:ident {$($property:ident: $type:ty,)*}) => { @@ -17,7 +17,7 @@ macro_rules! data { macro_rules! uid_data { ($($name:ident,)+) => ($( - pub type $name = HawkIdentifier; + pub type $name = UserIdentifier; )+) } @@ -25,7 +25,7 @@ macro_rules! collection_data { ($($name:ident {$($property:ident: $type:ty,)*},)+) => ($( data! { $name { - user_id: HawkIdentifier, + user_id: UserIdentifier, collection: String, $($property: $type,)* } @@ -37,7 +37,7 @@ macro_rules! bso_data { ($($name:ident {$($property:ident: $type:ty,)*},)+) => ($( data! { $name { - user_id: HawkIdentifier, + user_id: UserIdentifier, collection: String, id: String, $($property: $type,)* @@ -55,6 +55,56 @@ uid_data! { DeleteStorage, } +#[derive(Debug, Default, Clone)] +pub struct Offset { + pub timestamp: Option, + pub offset: u64, +} + +impl ToString for Offset { + fn to_string(&self) -> String { + // issue559: Disable ':' support for now. + self.offset.to_string() + /* + match self.timestamp { + None => self.offset.to_string(), + Some(ts) => format!("{}:{}", ts.as_i64(), self.offset), + } + */ + } +} + +impl FromStr for Offset { + type Err = ParseIntError; + fn from_str(s: &str) -> Result { + // issue559: Disable ':' support for now: simply parse as i64 as + // previously (it was u64 previously but i64's close enough) + let result = Offset { + timestamp: None, + offset: s.parse::()?, + }; + /* + let result = match s.chars().position(|c| c == ':') { + None => Offset { + timestamp: None, + offset: s.parse::()?, + }, + Some(_colon_position) => { + let mut parts = s.split(':'); + let timestamp_string = parts.next().unwrap_or("0"); + let timestamp = SyncTimestamp::from_milliseconds(timestamp_string.parse::()?); + let offset = parts.next().unwrap_or("0").parse::()?; + Offset { + timestamp: Some(timestamp), + offset, + } + } + }; + */ + Ok(result) + } +} + collection_data! { LockCollection {}, DeleteCollection {}, @@ -63,7 +113,13 @@ collection_data! { ids: Vec, }, GetBsos { - params: BsoQueryParams, + newer: Option, + older: Option, + sort: Sorting, + limit: Option, + offset: Option, + ids: Vec, + full: bool, }, PostBsos { bsos: Vec, @@ -120,7 +176,7 @@ pub struct Batch { } pub struct PutBso { - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, pub collection: String, pub id: String, pub sortindex: Option, @@ -138,26 +194,13 @@ pub struct PostCollectionBso { pub ttl: Option, } -impl From for PostCollectionBso { - fn from(b: BatchBsoBody) -> PostCollectionBso { - PostCollectionBso { - id: b.id, - sortindex: b.sortindex, - payload: b.payload, - ttl: b.ttl, - } - } -} - pub type GetCollectionId = String; -#[cfg(test)] pub type CreateCollection = String; -#[cfg(test)] data! { UpdateCollection { - user_id: HawkIdentifier, + user_id: UserIdentifier, collection_id: i32, collection: String, } diff --git a/src/db/results.rs b/syncstorage-db-common/src/results.rs similarity index 94% rename from src/db/results.rs rename to syncstorage-db-common/src/results.rs index 9bcefb0994..8fca2cd44c 100644 --- a/src/db/results.rs +++ b/syncstorage-db-common/src/results.rs @@ -1,11 +1,14 @@ //! Result types for database methods. use std::collections::HashMap; -use diesel::sql_types::{BigInt, Integer, Nullable, Text}; +use diesel::{ + sql_types::{BigInt, Integer, Nullable, Text}, + Queryable, QueryableByName, +}; use serde::{Deserialize, Serialize}; use super::params; -use crate::db::util::SyncTimestamp; +use crate::util::SyncTimestamp; pub type LockCollection = (); pub type GetBsoTimestamp = SyncTimestamp; @@ -89,8 +92,6 @@ pub struct ConnectionInfo { pub type GetCollectionId = i32; -#[cfg(test)] pub type CreateCollection = i32; -#[cfg(test)] pub type UpdateCollection = SyncTimestamp; diff --git a/src/db/util.rs b/syncstorage-db-common/src/util.rs similarity index 92% rename from src/db/util.rs rename to syncstorage-db-common/src/util.rs index c542b43c46..85f35c11dc 100644 --- a/src/db/util.rs +++ b/syncstorage-db-common/src/util.rs @@ -8,13 +8,14 @@ use diesel::{ backend::Backend, deserialize::{self, FromSql}, sql_types::BigInt, + FromSqlRow, }; use serde::{ser, Deserialize, Deserializer, Serialize, Serializer}; -use super::{DbError, DbErrorKind}; +use super::error::{DbError, DbErrorKind}; /// Get the time since the UNIX epoch in milliseconds -pub fn ms_since_epoch() -> i64 { +fn ms_since_epoch() -> i64 { Utc::now().timestamp_millis() } @@ -52,19 +53,14 @@ impl SyncTimestamp { } /// Create a `SyncTimestamp` from an i64 - /// - /// Only called from the db module - pub(super) fn from_i64(val: i64) -> Result { + pub fn from_i64(val: i64) -> Result { if val < 0 { - Err(DbErrorKind::Integrity( - "Invalid modified i64 (< 0)".to_owned(), - ))?; + return Err(DbErrorKind::Integrity("Invalid modified i64 (< 0)".to_owned()).into()); } Ok(SyncTimestamp::from_milliseconds(val as u64)) } /// Exposed separately for db tests - #[cfg(test)] pub fn _from_i64(val: i64) -> Result { SyncTimestamp::from_i64(val) } @@ -92,7 +88,7 @@ impl SyncTimestamp { fn from_datetime(val: DateTime) -> Result { let millis = val.timestamp_millis(); if millis < 0 { - Err(DbErrorKind::Integrity("Invalid DateTime (< 0)".to_owned()))?; + return Err(DbErrorKind::Integrity("Invalid DateTime (< 0)".to_owned()).into()); } Ok(SyncTimestamp::from_milliseconds(millis as u64)) } @@ -149,7 +145,7 @@ fn format_ts(val: u64) -> String { format!("{:.*}", 2, val as f64 / 1000.0) } -pub fn deserialize_ts<'de, D>(d: D) -> Result +fn deserialize_ts<'de, D>(d: D) -> Result where D: Deserializer<'de>, { diff --git a/syncstorage/Cargo.toml b/syncstorage/Cargo.toml new file mode 100644 index 0000000000..2a4dd2f9b3 --- /dev/null +++ b/syncstorage/Cargo.toml @@ -0,0 +1,103 @@ +[package] +name = "syncstorage" +version = "0.10.2" +license = "MPL-2.0" +authors = [ + "Ben Bangert ", + "Phil Jenvey ", + "Mozilla Services Engineering ", +] +edition = "2018" +default-run = "syncstorage" + +[dependencies] +actix-http = "2" +actix-web = "3" +actix-rt = "1" # Pin to 1.0, due to dependencies on Tokio +actix-cors = "0.5" +async-trait = "0.1.40" +backtrace = "0.3.61" +base64 = "0.13" +bb8 = "0.4.1" # pin to 0.4 due to dependencies on Tokio +bytes = "1.0" +cadence = "0.26" +chrono = "0.4" +config = "0.11" +# Pin to 0.5 for now, to keep it under tokio 0.2 (issue977). +# Fix for #803 (deadpool#92) points to our fork for now +#deadpool = "0.5" # pin to 0.5 +deadpool = { git = "https://github.com/mozilla-services/deadpool", branch = "deadpool-v0.5.2-issue92" } +diesel = { version = "1.4", features = ["mysql", "r2d2"] } +diesel_logger = "0.1.1" +diesel_migrations = { version = "1.4.0", features = ["mysql"] } +docopt = "1.1.0" +dyn-clone = "1.0.4" +env_logger = "0.9" +futures = { version = "0.3", features = ["compat"] } +google-cloud-rust-raw = "0.11.0" +# Some versions of OpenSSL 1.1.1 conflict with grpcio's built-in boringssl which can cause +# syncserver to either fail to either compile, or start. In those cases, try +# `cargo build --features grpcio/openssl ...` +grpcio = { version = "0.9" } +lazy_static = "1.4.0" +hawk = "3.2" +hex = "0.4.3" +hostname = "0.3.1" +hkdf = "0.11" +hmac = "0.11" +http = "0.2.5" +log = { version = "0.4", features = [ + "max_level_debug", + "release_max_level_info", +] } +mime = "0.3" +num_cpus = "1" +# must match what's used by googleapis-raw +protobuf = "2.20.0" +pyo3 = { version = "0.14", features = ["auto-initialize"] } +rand = "0.8" +regex = "1.4" +reqwest = { version = "0.10.10", features = ["json", "rustls-tls"] } +# pin to 0.19: https://github.com/getsentry/sentry-rust/issues/277 +sentry = { version = "0.19", features = [ + "with_curl_transport", +] } # pin to 0.19 until on-prem sentry server is updated +sentry-backtrace = "0.19" +serde = "1.0" +serde_derive = "1.0" +serde_json = { version = "1.0", features = ["arbitrary_precision"] } +scheduled-thread-pool = "0.2" +sha2 = "0.9" +slog = { version = "2.5", features = [ + "max_level_info", + "release_max_level_info", + "dynamic-keys", +] } +slog-async = "2.5" +slog-envlogger = "2.2.0" +slog-mozlog-json = "0.1" +slog-scope = "4.3" +slog-stdlog = "4.1" +slog-term = "2.6" +syncstorage-db-common = { path = "../syncstorage-db-common" } +syncstorage-common = { path = "../syncstorage-common" } +time = "^0.3" +thiserror = "1.0.26" +tokenserver-common = { path = "../tokenserver-common" } +# pinning to 0.2.4 due to high number of dependencies (actix, bb8, deadpool, etc.) +tokio = { version = "0.2.4", features = ["macros", "sync"] } +url = "2.1" +urlencoding = "2.1" +uuid = { version = "0.8.2", features = ["serde", "v4"] } +validator = "0.14" +validator_derive = "0.14" +woothee = "0.11" + +[dev-dependencies] +mockito = "0.30.0" + +[features] +no_auth = [] + +[[bin]] +name = "purge_ttl" diff --git a/src/bin/purge_ttl.rs b/syncstorage/src/bin/purge_ttl.rs similarity index 100% rename from src/bin/purge_ttl.rs rename to syncstorage/src/bin/purge_ttl.rs diff --git a/src/db/mock.rs b/syncstorage/src/db/mock.rs similarity index 94% rename from src/db/mock.rs rename to syncstorage/src/db/mock.rs index 08691106bc..c786165aca 100644 --- a/src/db/mock.rs +++ b/syncstorage/src/db/mock.rs @@ -2,8 +2,10 @@ #![allow(clippy::new_without_default)] use async_trait::async_trait; use futures::future; - -use super::*; +use syncstorage_db_common::{ + error::DbError, params, results, util::SyncTimestamp, Db, DbFuture, DbPool, GetPoolState, + PoolState, +}; #[derive(Clone, Debug)] pub struct MockDbPool; @@ -16,7 +18,7 @@ impl MockDbPool { #[async_trait] impl DbPool for MockDbPool { - async fn get<'a>(&'a self) -> ApiResult>> { + async fn get<'a>(&'a self) -> Result>, DbError> { Ok(Box::new(MockDb::new()) as Box>) } @@ -107,28 +109,21 @@ impl<'a> Db<'a> for MockDb { } mock_db_method!(get_collection_id, GetCollectionId); - #[cfg(test)] mock_db_method!(create_collection, CreateCollection); - #[cfg(test)] mock_db_method!(update_collection, UpdateCollection); - #[cfg(test)] fn timestamp(&self) -> SyncTimestamp { Default::default() } - #[cfg(test)] fn set_timestamp(&self, _: SyncTimestamp) {} - #[cfg(test)] mock_db_method!(delete_batch, DeleteBatch); - #[cfg(test)] fn clear_coll_cache(&self) -> DbFuture<'_, ()> { Box::pin(future::ok(())) } - #[cfg(test)] fn set_quota(&mut self, _: bool, _: usize, _: bool) {} } diff --git a/syncstorage/src/db/mod.rs b/syncstorage/src/db/mod.rs new file mode 100644 index 0000000000..df61d4d3b4 --- /dev/null +++ b/syncstorage/src/db/mod.rs @@ -0,0 +1,88 @@ +//! Generic db abstration. + +pub mod mock; +pub mod mysql; +pub mod spanner; +#[cfg(test)] +mod tests; +pub mod transaction; + +use std::time::Duration; + +use cadence::{Gauged, StatsdClient}; +use futures::TryFutureExt; +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + results, DbPool, GetPoolState, PoolState, +}; +use tokio::{self, task, time}; +use url::Url; + +use crate::server::metrics::Metrics; +use crate::settings::Settings; + +/// Create/initialize a pool of managed Db connections +pub async fn pool_from_settings( + settings: &Settings, + metrics: &Metrics, +) -> Result, DbError> { + let url = + Url::parse(&settings.database_url).map_err(|e| DbErrorKind::InvalidUrl(e.to_string()))?; + Ok(match url.scheme() { + "mysql" => Box::new(mysql::pool::MysqlDbPool::new(settings, metrics)?), + "spanner" => Box::new(spanner::pool::SpannerDbPool::new(settings, metrics).await?), + _ => Err(DbErrorKind::InvalidUrl(settings.database_url.to_owned()))?, + }) +} + +/// Emit DbPool metrics periodically +pub fn spawn_pool_periodic_reporter( + interval: Duration, + metrics: StatsdClient, + pool: T, +) -> Result<(), DbError> { + let hostname = hostname::get() + .expect("Couldn't get hostname") + .into_string() + .expect("Couldn't get hostname"); + tokio::spawn(async move { + loop { + let PoolState { + connections, + idle_connections, + } = pool.state(); + metrics + .gauge_with_tags( + "storage.pool.connections.active", + (connections - idle_connections) as u64, + ) + .with_tag("hostname", &hostname) + .send(); + metrics + .gauge_with_tags("storage.pool.connections.idle", idle_connections as u64) + .with_tag("hostname", &hostname) + .send(); + time::delay_for(interval).await; + } + }); + + Ok(()) +} + +pub async fn run_on_blocking_threadpool(f: F) -> Result +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + task::spawn_blocking(f) + .map_err(|err| { + if err.is_cancelled() { + DbError::internal("Db threadpool operation cancelled") + } else if err.is_panic() { + DbError::internal("Db threadpool operation panicked") + } else { + DbError::internal("Db threadpool operation failed for unknown reason") + } + }) + .await? +} diff --git a/src/db/mysql/batch.rs b/syncstorage/src/db/mysql/batch.rs similarity index 98% rename from src/db/mysql/batch.rs rename to syncstorage/src/db/mysql/batch.rs index 4719375f49..ef7ab50991 100644 --- a/src/db/mysql/batch.rs +++ b/syncstorage/src/db/mysql/batch.rs @@ -9,17 +9,16 @@ use diesel::{ sql_types::{BigInt, Integer}, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, }; +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, results, UserIdentifier, BATCH_LIFETIME, +}; use super::{ models::{MysqlDb, Result}, schema::{batch_upload_items, batch_uploads}, }; -use crate::{ - db::{params, results, DbError, DbErrorKind, BATCH_LIFETIME}, - web::extractors::HawkIdentifier, -}; - const MAXTTL: i32 = 2_100_000_000; pub fn create(db: &MysqlDb, params: params::CreateBatch) -> Result { @@ -167,7 +166,7 @@ pub fn commit(db: &MysqlDb, params: params::CommitBatch) -> Result, ) -> Result<()> { diff --git a/src/db/mysql/batch_commit.sql b/syncstorage/src/db/mysql/batch_commit.sql similarity index 100% rename from src/db/mysql/batch_commit.sql rename to syncstorage/src/db/mysql/batch_commit.sql diff --git a/src/db/mysql/diesel_ext.rs b/syncstorage/src/db/mysql/diesel_ext.rs similarity index 100% rename from src/db/mysql/diesel_ext.rs rename to syncstorage/src/db/mysql/diesel_ext.rs diff --git a/src/db/mysql/mod.rs b/syncstorage/src/db/mysql/mod.rs similarity index 100% rename from src/db/mysql/mod.rs rename to syncstorage/src/db/mysql/mod.rs diff --git a/src/db/mysql/models.rs b/syncstorage/src/db/mysql/models.rs similarity index 91% rename from src/db/mysql/models.rs rename to syncstorage/src/db/mysql/models.rs index 02851332d6..e842006e86 100644 --- a/src/db/mysql/models.rs +++ b/syncstorage/src/db/mysql/models.rs @@ -1,5 +1,3 @@ -use actix_web::web::block; - use futures::future::TryFutureExt; use std::{self, cell::RefCell, collections::HashMap, fmt, ops::Deref, sync::Arc}; @@ -17,6 +15,12 @@ use diesel::{ }; #[cfg(test)] use diesel_logger::LoggingConnection; +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, results, + util::SyncTimestamp, + Db, DbFuture, Sorting, UserIdentifier, DEFAULT_BSO_TTL, +}; use super::{ batch, @@ -24,24 +28,14 @@ use super::{ pool::CollectionCache, schema::{bso, collections, user_collections}, }; -use crate::db::{ - error::{DbError, DbErrorKind}, - params, results, - util::SyncTimestamp, - Db, DbFuture, Sorting, -}; +use crate::db; use crate::server::metrics::Metrics; use crate::settings::{Quota, DEFAULT_MAX_TOTAL_RECORDS}; -use crate::web::extractors::{BsoQueryParams, HawkIdentifier}; use crate::web::tags::Tags; pub type Result = std::result::Result; type Conn = PooledConnection>; -/// The ttl to use for rows that are never supposed to expire (in seconds) -/// We store the TTL as a SyncTimestamp, which is milliseconds, so remember -/// to multiply this by 1000. -pub const DEFAULT_BSO_TTL: u32 = 2_100_000_000; // this is the max number of records we will return. pub static DEFAULT_LIMIT: u32 = DEFAULT_MAX_TOTAL_RECORDS; @@ -78,7 +72,7 @@ struct MysqlDbSession { #[derive(Clone, Debug)] pub struct MysqlDb { - /// Synchronous Diesel calls are executed in actix_web::web::block to satisfy + /// Synchronous Diesel calls are executed in tokio::task::spawn_blocking to satisfy /// the Db trait's asynchronous interface. /// /// Arc provides a Clone impl utilized for safely moving to @@ -155,14 +149,15 @@ impl MysqlDb { /// the efficiency of that approach at scale. pub fn lock_for_read_sync(&self, params: params::LockCollection) -> Result<()> { let user_id = params.user_id.legacy_id as i64; - let collection_id = - self.get_collection_id(¶ms.collection) - .or_else(|e| match e.kind() { - // If the collection doesn't exist, we still want to start a - // transaction so it will continue to not exist. - DbErrorKind::CollectionNotFound => Ok(0), - _ => Err(e), - })?; + let collection_id = self.get_collection_id(¶ms.collection).or_else(|e| { + if e.is_collection_not_found() { + // If the collection doesn't exist, we still want to start a + // transaction so it will continue to not exist. + Ok(0) + } else { + Err(e) + } + })?; // If we already have a read or write lock then it's safe to // use it as-is. if self @@ -288,7 +283,7 @@ impl MysqlDb { Ok(()) } - pub fn delete_storage_sync(&self, user_id: HawkIdentifier) -> Result<()> { + pub fn delete_storage_sync(&self, user_id: UserIdentifier) -> Result<()> { let user_id = user_id.legacy_id as i64; // Delete user data. delete(bso::table) @@ -402,7 +397,7 @@ impl MysqlDb { let timestamp = self.timestamp().as_i64(); if self.quota.enabled { let usage = self.get_quota_usage_sync(params::GetQuotaUsage { - user_id: HawkIdentifier::new_legacy(user_id), + user_id: UserIdentifier::new_legacy(user_id), collection: bso.collection.clone(), collection_id, })?; @@ -484,15 +479,6 @@ impl MysqlDb { pub fn get_bsos_sync(&self, params: params::GetBsos) -> Result { let user_id = params.user_id.legacy_id as i64; let collection_id = self.get_collection_id(¶ms.collection)?; - let BsoQueryParams { - newer, - older, - sort, - limit, - offset, - ids, - .. - } = params.params; let now = self.timestamp().as_i64(); let mut query = bso::table .select(( @@ -507,22 +493,22 @@ impl MysqlDb { .filter(bso::expiry.gt(now)) .into_boxed(); - if let Some(older) = older { + if let Some(older) = params.older { query = query.filter(bso::modified.lt(older.as_i64())); } - if let Some(newer) = newer { + if let Some(newer) = params.newer { query = query.filter(bso::modified.gt(newer.as_i64())); } - if !ids.is_empty() { - query = query.filter(bso::id.eq_any(ids)); + if !params.ids.is_empty() { + query = query.filter(bso::id.eq_any(params.ids)); } // it's possible for two BSOs to be inserted with the same `modified` date, // since there's no guarantee of order when doing a get, pagination can return // an error. We "fudge" a bit here by taking the id order as a secondary, since // that is guaranteed to be unique by the client. - query = match sort { + query = match params.sort { // issue559: Revert to previous sorting /* Sorting::Index => query.order(bso::id.desc()).order(bso::sortindex.desc()), @@ -537,12 +523,16 @@ impl MysqlDb { _ => query, }; - let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT as i64).max(0); + let limit = params + .limit + .map(i64::from) + .unwrap_or(DEFAULT_LIMIT as i64) + .max(0); // fetch an extra row to detect if there are more rows that // match the query conditions query = query.limit(if limit > 0 { limit + 1 } else { limit }); - let numeric_offset = offset.map_or(0, |offset| offset.offset as i64); + let numeric_offset = params.offset.map_or(0, |offset| offset.offset as i64); if numeric_offset > 0 { // XXX: copy over this optimization: @@ -579,15 +569,6 @@ impl MysqlDb { pub fn get_bso_ids_sync(&self, params: params::GetBsos) -> Result { let user_id = params.user_id.legacy_id as i64; let collection_id = self.get_collection_id(¶ms.collection)?; - let BsoQueryParams { - newer, - older, - sort, - limit, - offset, - ids, - .. - } = params.params; let mut query = bso::table .select(bso::id) .filter(bso::user_id.eq(user_id)) @@ -595,18 +576,18 @@ impl MysqlDb { .filter(bso::expiry.gt(self.timestamp().as_i64())) .into_boxed(); - if let Some(older) = older { + if let Some(older) = params.older { query = query.filter(bso::modified.lt(older.as_i64())); } - if let Some(newer) = newer { + if let Some(newer) = params.newer { query = query.filter(bso::modified.gt(newer.as_i64())); } - if !ids.is_empty() { - query = query.filter(bso::id.eq_any(ids)); + if !params.ids.is_empty() { + query = query.filter(bso::id.eq_any(params.ids)); } - query = match sort { + query = match params.sort { Sorting::Index => query.order(bso::sortindex.desc()), Sorting::Newest => query.order(bso::modified.desc()), Sorting::Oldest => query.order(bso::modified.asc()), @@ -614,11 +595,15 @@ impl MysqlDb { }; // negative limits are no longer allowed by mysql. - let limit = limit.map(i64::from).unwrap_or(DEFAULT_LIMIT as i64).max(0); + let limit = params + .limit + .map(i64::from) + .unwrap_or(DEFAULT_LIMIT as i64) + .max(0); // fetch an extra row to detect if there are more rows that // match the query conditions. Negative limits will cause an error. query = query.limit(if limit == 0 { limit } else { limit + 1 }); - let numeric_offset = offset.map_or(0, |offset| offset.offset as i64); + let numeric_offset = params.offset.map_or(0, |offset| offset.offset as i64); if numeric_offset != 0 { // XXX: copy over this optimization: // https://github.com/mozilla-services/server-syncstorage/blob/a0f8117/syncstorage/storage/sql/__init__.py#L404 @@ -722,7 +707,7 @@ impl MysqlDb { Ok(result) } - pub fn get_storage_timestamp_sync(&self, user_id: HawkIdentifier) -> Result { + pub fn get_storage_timestamp_sync(&self, user_id: UserIdentifier) -> Result { let user_id = user_id.legacy_id as i64; let modified = user_collections::table .select(max(user_collections::modified)) @@ -771,7 +756,7 @@ impl MysqlDb { pub fn get_collection_timestamps_sync( &self, - user_id: HawkIdentifier, + user_id: UserIdentifier, ) -> Result { let modifieds = sql_query(format!( "SELECT {collection_id}, {modified} @@ -886,7 +871,7 @@ impl MysqlDb { // Perform a lighter weight "read only" storage size check pub fn get_storage_usage_sync( &self, - user_id: HawkIdentifier, + user_id: UserIdentifier, ) -> Result { let uid = user_id.legacy_id as i64; let total_bytes = bso::table @@ -944,7 +929,7 @@ impl MysqlDb { pub fn get_collection_usage_sync( &self, - user_id: HawkIdentifier, + user_id: UserIdentifier, ) -> Result { let counts = bso::table .select((bso::collection_id, sql::("SUM(LENGTH(payload))"))) @@ -959,7 +944,7 @@ impl MysqlDb { pub fn get_collection_counts_sync( &self, - user_id: HawkIdentifier, + user_id: UserIdentifier, ) -> Result { let counts = bso::table .select(( @@ -982,7 +967,6 @@ impl MysqlDb { batch_db_method!(validate_batch_sync, validate, ValidateBatch); batch_db_method!(append_to_batch_sync, append, AppendToBatch); batch_db_method!(commit_batch_sync, commit, CommitBatch); - #[cfg(test)] batch_db_method!(delete_batch_sync, delete, DeleteBatch); pub fn get_batch_sync(&self, params: params::GetBatch) -> Result> { @@ -993,7 +977,6 @@ impl MysqlDb { self.session.borrow().timestamp } } - #[macro_export] macro_rules! sync_db_method { ($name:ident, $sync_name:ident, $type:ident) => { @@ -1002,7 +985,9 @@ macro_rules! sync_db_method { ($name:ident, $sync_name:ident, $type:ident, $result:ty) => { fn $name(&self, params: params::$type) -> DbFuture<'_, $result> { let db = self.clone(); - Box::pin(block(move || db.$sync_name(params).map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || { + db.$sync_name(params) + })) } }; } @@ -1010,12 +995,12 @@ macro_rules! sync_db_method { impl<'a> Db<'a> for MysqlDb { fn commit(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin(block(move || db.commit_sync().map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || db.commit_sync())) } fn rollback(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin(block(move || db.rollback_sync().map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || db.rollback_sync())) } fn begin(&self, for_write: bool) -> DbFuture<'_, ()> { @@ -1029,7 +1014,7 @@ impl<'a> Db<'a> for MysqlDb { fn check(&self) -> DbFuture<'_, results::Check> { let db = self.clone(); - Box::pin(block(move || db.check_sync().map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || db.check_sync())) } sync_db_method!(lock_for_read, lock_for_read_sync, LockCollection); @@ -1089,60 +1074,47 @@ impl<'a> Db<'a> for MysqlDb { fn get_collection_id(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); - Box::pin(block(move || db.get_collection_id(&name).map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || { + db.get_collection_id(&name) + })) } fn get_connection_info(&self) -> results::ConnectionInfo { results::ConnectionInfo::default() } - #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); - Box::pin( - block(move || db.get_or_create_collection_id(&name).map_err(Into::into)) - .map_err(Into::into), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.get_or_create_collection_id(&name) + })) } - #[cfg(test)] fn update_collection(&self, param: params::UpdateCollection) -> DbFuture<'_, SyncTimestamp> { let db = self.clone(); - Box::pin( - block(move || { - db.update_collection(param.user_id.legacy_id as u32, param.collection_id) - .map_err(Into::into) - }) - .map_err(Into::into), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.update_collection(param.user_id.legacy_id as u32, param.collection_id) + })) } - #[cfg(test)] fn timestamp(&self) -> SyncTimestamp { self.timestamp() } - #[cfg(test)] fn set_timestamp(&self, timestamp: SyncTimestamp) { self.session.borrow_mut().timestamp = timestamp; } - #[cfg(test)] sync_db_method!(delete_batch, delete_batch_sync, DeleteBatch); - #[cfg(test)] fn clear_coll_cache(&self) -> DbFuture<'_, ()> { let db = self.clone(); - Box::pin( - block(move || { - db.coll_cache.clear(); - Ok(()) - }) - .map_err(Into::into), - ) + Box::pin(db::run_on_blocking_threadpool(move || { + db.coll_cache.clear(); + Ok(()) + })) } - #[cfg(test)] fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) { self.quota = Quota { size: limit, diff --git a/src/db/mysql/pool.rs b/syncstorage/src/db/mysql/pool.rs similarity index 90% rename from src/db/mysql/pool.rs rename to syncstorage/src/db/mysql/pool.rs index ed17ae5dde..c3995bb979 100644 --- a/src/db/mysql/pool.rs +++ b/syncstorage/src/db/mysql/pool.rs @@ -1,5 +1,3 @@ -use actix_web::web::block; - use async_trait::async_trait; use std::{ @@ -16,12 +14,12 @@ use diesel::{ }; #[cfg(test)] use diesel_logger::LoggingConnection; +use syncstorage_db_common::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}; use super::models::{MysqlDb, Result}; #[cfg(test)] use super::test::TestTransactionCustomizer; -use crate::db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}; -use crate::error::{ApiError, ApiResult}; +use crate::db; use crate::server::metrics::Metrics; use crate::settings::{Quota, Settings}; @@ -103,9 +101,9 @@ impl MysqlDbPool { #[async_trait] impl DbPool for MysqlDbPool { - async fn get<'a>(&'a self) -> ApiResult>> { + async fn get<'a>(&'a self) -> Result>> { let pool = self.clone(); - let db = block(move || pool.get_sync().map_err(ApiError::from)).await?; + let db = db::run_on_blocking_threadpool(move || pool.get_sync()).await?; Ok(Box::new(db) as Box>) } @@ -173,7 +171,6 @@ impl CollectionCache { .cloned()) } - #[cfg(test)] pub fn clear(&self) { self.by_name.write().expect("by_name write").clear(); self.by_id.write().expect("by_id write").clear(); @@ -198,12 +195,3 @@ impl Default for CollectionCache { } } } - -impl From for PoolState { - fn from(state: diesel::r2d2::State) -> PoolState { - PoolState { - connections: state.connections, - idle_connections: state.idle_connections, - } - } -} diff --git a/src/db/mysql/schema.rs b/syncstorage/src/db/mysql/schema.rs similarity index 100% rename from src/db/mysql/schema.rs rename to syncstorage/src/db/mysql/schema.rs diff --git a/src/db/mysql/test.rs b/syncstorage/src/db/mysql/test.rs similarity index 100% rename from src/db/mysql/test.rs rename to syncstorage/src/db/mysql/test.rs diff --git a/src/db/spanner/BATCH_COMMIT.txt b/syncstorage/src/db/spanner/BATCH_COMMIT.txt similarity index 100% rename from src/db/spanner/BATCH_COMMIT.txt rename to syncstorage/src/db/spanner/BATCH_COMMIT.txt diff --git a/src/db/spanner/batch.rs b/syncstorage/src/db/spanner/batch.rs similarity index 98% rename from src/db/spanner/batch.rs rename to syncstorage/src/db/spanner/batch.rs index 3031158a92..c13af9da51 100644 --- a/src/db/spanner/batch.rs +++ b/syncstorage/src/db/spanner/batch.rs @@ -8,14 +8,17 @@ use protobuf::{ well_known_types::{ListValue, Value}, RepeatedField, }; +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, results, + util::to_rfc3339, + UserIdentifier, BATCH_LIFETIME, DEFAULT_BSO_TTL, +}; use uuid::Uuid; -use super::models::{Result, SpannerDb, DEFAULT_BSO_TTL, PRETOUCH_TS}; +use super::models::{Result, SpannerDb, PRETOUCH_TS}; use super::support::{as_type, null_value, struct_type_field, IntoSpannerValue}; -use crate::{ - db::{params, results, util::to_rfc3339, DbError, DbErrorKind, BATCH_LIFETIME}, - web::{extractors::HawkIdentifier, tags::Tags}, -}; +use crate::web::tags::Tags; pub async fn create_async( db: &SpannerDb, @@ -242,7 +245,7 @@ pub async fn commit_async( // Append a collection to an existing, pending batch. pub async fn do_append_async( db: &SpannerDb, - user_id: HawkIdentifier, + user_id: UserIdentifier, collection_id: i32, batch: results::CreateBatch, bsos: Vec, @@ -524,7 +527,7 @@ pub async fn do_append_async( /// prior data. async fn pretouch_collection_async( db: &SpannerDb, - user_id: &HawkIdentifier, + user_id: &UserIdentifier, collection_id: i32, ) -> Result<()> { let (mut sqlparams, mut sqlparam_types) = params! { diff --git a/src/db/spanner/batch_commit_insert.sql b/syncstorage/src/db/spanner/batch_commit_insert.sql similarity index 100% rename from src/db/spanner/batch_commit_insert.sql rename to syncstorage/src/db/spanner/batch_commit_insert.sql diff --git a/src/db/spanner/batch_commit_update.sql b/syncstorage/src/db/spanner/batch_commit_update.sql similarity index 100% rename from src/db/spanner/batch_commit_update.sql rename to syncstorage/src/db/spanner/batch_commit_update.sql diff --git a/src/db/spanner/batch_index.sql b/syncstorage/src/db/spanner/batch_index.sql similarity index 100% rename from src/db/spanner/batch_index.sql rename to syncstorage/src/db/spanner/batch_index.sql diff --git a/src/db/spanner/insert_standard_collections.sql b/syncstorage/src/db/spanner/insert_standard_collections.sql similarity index 100% rename from src/db/spanner/insert_standard_collections.sql rename to syncstorage/src/db/spanner/insert_standard_collections.sql diff --git a/src/db/spanner/macros.rs b/syncstorage/src/db/spanner/macros.rs similarity index 100% rename from src/db/spanner/macros.rs rename to syncstorage/src/db/spanner/macros.rs diff --git a/src/db/spanner/manager/bb8.rs b/syncstorage/src/db/spanner/manager/bb8.rs similarity index 100% rename from src/db/spanner/manager/bb8.rs rename to syncstorage/src/db/spanner/manager/bb8.rs diff --git a/src/db/spanner/manager/deadpool.rs b/syncstorage/src/db/spanner/manager/deadpool.rs similarity index 86% rename from src/db/spanner/manager/deadpool.rs rename to syncstorage/src/db/spanner/manager/deadpool.rs index 74eef40c9d..b2455ec95f 100644 --- a/src/db/spanner/manager/deadpool.rs +++ b/syncstorage/src/db/spanner/manager/deadpool.rs @@ -3,15 +3,9 @@ use std::{fmt, sync::Arc}; use async_trait::async_trait; use deadpool::managed::{Manager, RecycleError, RecycleResult}; use grpcio::{EnvBuilder, Environment}; +use syncstorage_db_common::error::{DbError, DbErrorKind}; -use crate::{ - db::{ - error::{DbError, DbErrorKind}, - PoolState, - }, - server::metrics::Metrics, - settings::Settings, -}; +use crate::{server::metrics::Metrics, settings::Settings}; use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession}; @@ -88,12 +82,3 @@ impl Manager for SpannerSessionManager { .map_err(RecycleError::Backend) } } - -impl From for PoolState { - fn from(status: deadpool::Status) -> PoolState { - PoolState { - connections: status.size as u32, - idle_connections: status.available.max(0) as u32, - } - } -} diff --git a/src/db/spanner/manager/mod.rs b/syncstorage/src/db/spanner/manager/mod.rs similarity index 100% rename from src/db/spanner/manager/mod.rs rename to syncstorage/src/db/spanner/manager/mod.rs diff --git a/src/db/spanner/manager/session.rs b/syncstorage/src/db/spanner/manager/session.rs similarity index 93% rename from src/db/spanner/manager/session.rs rename to syncstorage/src/db/spanner/manager/session.rs index 216eafb8ba..b7767d55e1 100644 --- a/src/db/spanner/manager/session.rs +++ b/syncstorage/src/db/spanner/manager/session.rs @@ -1,16 +1,13 @@ -use actix_web::web::block; use google_cloud_rust_raw::spanner::v1::{ spanner::{CreateSessionRequest, GetSessionRequest, Session}, spanner_grpc::SpannerClient, }; use grpcio::{CallOption, ChannelBuilder, ChannelCredentials, Environment, MetadataBuilder}; use std::sync::Arc; +use syncstorage_db_common::error::{DbError, DbErrorKind}; -use crate::db::spanner::now; -use crate::{ - db::error::{DbError, DbErrorKind}, - server::metrics::Metrics, -}; +use crate::db::{self, spanner::now}; +use crate::server::metrics::Metrics; const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443"; @@ -42,7 +39,7 @@ pub async fn create_spanner_session( emulator_host: Option, ) -> Result { let using_spanner_emulator = emulator_host.is_some(); - let chan = block(move || -> Result { + let chan = db::run_on_blocking_threadpool(move || -> Result { if let Some(spanner_emulator_address) = emulator_host { Ok(ChannelBuilder::new(env) .max_send_message_len(100 << 20) @@ -62,13 +59,7 @@ pub async fn create_spanner_session( .secure_connect(SPANNER_ADDRESS, creds)) } }) - .await - .map_err(|e| match e { - actix_web::error::BlockingError::Error(e) => e.into(), - actix_web::error::BlockingError::Canceled => { - DbError::internal("web::block Manager operation canceled") - } - })?; + .await?; let client = SpannerClient::new(chan); // Connect to the instance and create a Spanner session. diff --git a/src/db/spanner/mod.rs b/syncstorage/src/db/spanner/mod.rs similarity index 100% rename from src/db/spanner/mod.rs rename to syncstorage/src/db/spanner/mod.rs diff --git a/src/db/spanner/models.rs b/syncstorage/src/db/spanner/models.rs similarity index 96% rename from src/db/spanner/models.rs rename to syncstorage/src/db/spanner/models.rs index 083a02a027..91dc87dd23 100644 --- a/src/db/spanner/models.rs +++ b/syncstorage/src/db/spanner/models.rs @@ -22,23 +22,15 @@ use protobuf::{ well_known_types::{ListValue, Value}, Message, RepeatedField, }; - -use crate::{ - db::{ - error::{DbError, DbErrorKind}, - params, results, - spanner::now, - util::SyncTimestamp, - Db, DbFuture, Sorting, FIRST_CUSTOM_COLLECTION_ID, - }, - server::metrics::Metrics, - settings::Quota, - web::{ - extractors::{BsoQueryParams, HawkIdentifier, Offset}, - tags::Tags, - }, +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, results, + util::SyncTimestamp, + Db, DbFuture, Sorting, UserIdentifier, DEFAULT_BSO_TTL, FIRST_CUSTOM_COLLECTION_ID, }; +use crate::{db::spanner::now, server::metrics::Metrics, settings::Quota, web::tags::Tags}; + use super::{ batch, pool::{CollectionCache, Conn}, @@ -56,9 +48,6 @@ pub enum CollectionLock { pub type Result = std::result::Result; -/// The ttl to use for rows that are never supposed to expire (in seconds) -pub const DEFAULT_BSO_TTL: u32 = 2_100_000_000; - pub const TOMBSTONE: i32 = 0; pub const PRETOUCH_TS: &str = "0001-01-01T00:00:00.00Z"; @@ -73,9 +62,9 @@ struct SpannerDbSession { /// operations timestamp: Option, /// Cache of collection modified timestamps per (HawkIdentifier, collection_id) - coll_modified_cache: HashMap<(HawkIdentifier, i32), SyncTimestamp>, + coll_modified_cache: HashMap<(UserIdentifier, i32), SyncTimestamp>, /// Currently locked collections - coll_locks: HashMap<(HawkIdentifier, i32), CollectionLock>, + coll_locks: HashMap<(UserIdentifier, i32), CollectionLock>, transaction: Option, /// Behind Vec so commit can take() it (maybe commit() should consume self /// instead?) @@ -204,14 +193,9 @@ impl SpannerDb { } async fn get_or_create_collection_id_async(&self, name: &str) -> Result { - let result = self.get_collection_id_async(name).await; - if let Err(err) = result { - match err.kind() { - DbErrorKind::CollectionNotFound => self.create_collection_async(name).await, - _ => Err(err), - } - } else { - result + match self.get_collection_id_async(name).await { + Err(err) if err.is_collection_not_found() => self.create_collection_async(name).await, + result => result, } } @@ -222,11 +206,14 @@ impl SpannerDb { let collection_id = self .get_collection_id_async(¶ms.collection) .await - .or_else(|e| match e.kind() { - // If the collection doesn't exist, we still want to start a - // transaction so it will continue to not exist. - DbErrorKind::CollectionNotFound => Ok(0), - _ => Err(e), + .or_else(|e| { + if e.is_collection_not_found() { + // If the collection doesn't exist, we still want to start a + // transaction so it will continue to not exist. + Ok(0) + } else { + Err(e) + } })?; // If we already have a read or write lock then it's safe to // use it as-is. @@ -852,7 +839,7 @@ impl SpannerDb { pub async fn update_user_collection_quotas( &self, - user: &HawkIdentifier, + user: &UserIdentifier, collection_id: i32, ) -> Result { // This will also update the counts in user_collections, since `update_collection_sync` @@ -979,7 +966,7 @@ impl SpannerDb { Ok(timestamp) } - async fn erect_tombstone(&self, user_id: &HawkIdentifier) -> Result { + async fn erect_tombstone(&self, user_id: &UserIdentifier) -> Result { // Delete the old tombstone (if it exists) let (params, mut param_types) = params! { "fxa_uid" => user_id.fxa_uid.clone(), @@ -1071,7 +1058,7 @@ impl SpannerDb { pub(super) async fn update_collection_async( &self, - user_id: &HawkIdentifier, + user_id: &UserIdentifier, collection_id: i32, collection: &str, ) -> Result { @@ -1221,20 +1208,11 @@ impl SpannerDb { "fxa_kid" => params.user_id.fxa_kid, "collection_id" => self.get_collection_id_async(¶ms.collection).await?, }; - let BsoQueryParams { - newer, - older, - sort, - limit, - offset, - ids, - .. - } = params.params; - - if !ids.is_empty() { + + if !params.ids.is_empty() { query = format!("{} AND bso_id IN UNNEST(@ids)", query); - sqlparam_types.insert("ids".to_owned(), ids.spanner_type()); - sqlparams.insert("ids".to_owned(), ids.into_spanner_value()); + sqlparam_types.insert("ids".to_owned(), params.ids.spanner_type()); + sqlparams.insert("ids".to_owned(), params.ids.into_spanner_value()); } // issue559: Dead code (timestamp always None) @@ -1261,7 +1239,7 @@ impl SpannerDb { }; } */ - if let Some(older) = older { + if let Some(older) = params.older { query = format!("{} AND modified < @older", query); sqlparams.insert( "older".to_string(), @@ -1269,7 +1247,7 @@ impl SpannerDb { ); sqlparam_types.insert("older".to_string(), as_type(TypeCode::TIMESTAMP)); } - if let Some(newer) = newer { + if let Some(newer) = params.newer { query = format!("{} AND modified > @newer", query); sqlparams.insert( "newer".to_string(), @@ -1279,7 +1257,7 @@ impl SpannerDb { } if self.stabilize_bsos_sort_order() { - query = match sort { + query = match params.sort { Sorting::Index => format!("{} ORDER BY sortindex DESC, bso_id DESC", query), Sorting::Newest | Sorting::None => { format!("{} ORDER BY modified DESC, bso_id DESC", query) @@ -1287,7 +1265,7 @@ impl SpannerDb { Sorting::Oldest => format!("{} ORDER BY modified ASC, bso_id ASC", query), }; } else { - query = match sort { + query = match params.sort { Sorting::Index => format!("{} ORDER BY sortindex DESC", query), Sorting::Newest => format!("{} ORDER BY modified DESC", query), Sorting::Oldest => format!("{} ORDER BY modified ASC", query), @@ -1295,11 +1273,11 @@ impl SpannerDb { }; } - if let Some(limit) = limit { + if let Some(limit) = params.limit { // fetch an extra row to detect if there are more rows that match // the query conditions query = format!("{} LIMIT {}", query, i64::from(limit) + 1); - } else if let Some(ref offset) = offset { + } else if let Some(ref offset) = params.offset { // Special case no limit specified but still required for an // offset. Spanner doesn't accept a simpler limit of -1 (common in // most databases) so we specify a max value with offset subtracted @@ -1312,7 +1290,7 @@ impl SpannerDb { ); }; - if let Some(offset) = offset { + if let Some(offset) = params.offset { query = format!("{} OFFSET {}", query, offset.offset); } self.sql(&query)? @@ -1337,7 +1315,7 @@ impl SpannerDb { // now: was previously a value of "limit + offset", modifieds.len() // always equals limit Some( - Offset { + params::Offset { offset: offset + modifieds.len() as u64, timestamp: None, } @@ -1351,7 +1329,7 @@ impl SpannerDb { Sorting::Index => { // Use a simple numeric offset for sortindex ordering. return Some( - Offset { + params::Offset { offset: offset + modifieds.len() as u64, timestamp: None, } @@ -1387,9 +1365,9 @@ impl SpannerDb { AND fxa_kid = @fxa_kid AND collection_id = @collection_id AND expiry > CURRENT_TIMESTAMP()"; - let limit = params.params.limit.map(i64::from).unwrap_or(-1); - let Offset { offset, timestamp } = params.params.offset.clone().unwrap_or_default(); - let sort = params.params.sort; + let limit = params.limit.map(i64::from).unwrap_or(-1); + let params::Offset { offset, timestamp } = params.offset.clone().unwrap_or_default(); + let sort = params.sort; let mut streaming = self.bsos_query_async(query, params).await?; let mut bsos = vec![]; @@ -1420,9 +1398,9 @@ impl SpannerDb { } pub async fn get_bso_ids_async(&self, params: params::GetBsos) -> Result { - let limit = params.params.limit.map(i64::from).unwrap_or(-1); - let Offset { offset, timestamp } = params.params.offset.clone().unwrap_or_default(); - let sort = params.params.sort; + let limit = params.limit.map(i64::from).unwrap_or(-1); + let params::Offset { offset, timestamp } = params.offset.clone().unwrap_or_default(); + let sort = params.sort; let query = "\ SELECT bso_id, modified @@ -1668,7 +1646,7 @@ impl SpannerDb { pub async fn check_quota( &self, - user_id: &HawkIdentifier, + user_id: &UserIdentifier, collection: &str, collection_id: i32, ) -> Result> { @@ -1695,9 +1673,8 @@ impl SpannerDb { // NOTE: Currently this put_bso_async_test impl. is only used during db tests, // see above for the non-tests version - #[cfg(test)] pub async fn put_bso_async_test(&self, bso: params::PutBso) -> Result { - use crate::db::util::to_rfc3339; + use syncstorage_db_common::util::to_rfc3339; let collection_id = self .get_or_create_collection_id_async(&bso.collection) .await?; @@ -1870,7 +1847,6 @@ impl SpannerDb { // NOTE: Currently this post_bso_async_test impl. is only used during db tests, // see above for the non-tests version - #[cfg(test)] pub async fn post_bsos_async_test(&self, input: params::PostBsos) -> Result { let collection_id = self .get_or_create_collection_id_async(&input.collection) @@ -2128,13 +2104,11 @@ impl<'a> Db<'a> for SpannerDb { } } - #[cfg(test)] fn create_collection(&self, name: String) -> DbFuture<'_, i32> { let db = self.clone(); Box::pin(async move { db.create_collection_async(&name).map_err(Into::into).await }) } - #[cfg(test)] fn update_collection(&self, param: params::UpdateCollection) -> DbFuture<'_, SyncTimestamp> { let db = self.clone(); Box::pin(async move { @@ -2144,24 +2118,20 @@ impl<'a> Db<'a> for SpannerDb { }) } - #[cfg(test)] fn timestamp(&self) -> SyncTimestamp { self.timestamp() .expect("set_timestamp() not called yet for SpannerDb") } - #[cfg(test)] fn set_timestamp(&self, timestamp: SyncTimestamp) { SpannerDb::set_timestamp(self, timestamp) } - #[cfg(test)] fn delete_batch(&self, param: params::DeleteBatch) -> DbFuture<'_, results::DeleteBatch> { let db = self.clone(); Box::pin(async move { batch::delete_async(&db, param).map_err(Into::into).await }) } - #[cfg(test)] fn clear_coll_cache(&self) -> DbFuture<'_, ()> { let db = self.clone(); Box::pin(async move { @@ -2170,7 +2140,6 @@ impl<'a> Db<'a> for SpannerDb { }) } - #[cfg(test)] fn set_quota(&mut self, enabled: bool, limit: usize, enforced: bool) { self.quota = Quota { size: limit, diff --git a/src/db/spanner/pool.rs b/syncstorage/src/db/spanner/pool.rs similarity index 97% rename from src/db/spanner/pool.rs rename to syncstorage/src/db/spanner/pool.rs index 77b6149cfd..d7f3160adb 100644 --- a/src/db/spanner/pool.rs +++ b/syncstorage/src/db/spanner/pool.rs @@ -2,11 +2,10 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; use async_trait::async_trait; use bb8::ErrorSink; +use syncstorage_db_common::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}; use tokio::sync::RwLock; use crate::{ - db::{error::DbError, Db, DbPool, GetPoolState, PoolState, STD_COLLS}, - error::ApiResult, server::metrics::Metrics, settings::{Quota, Settings}, }; @@ -90,7 +89,7 @@ impl SpannerDbPool { #[async_trait] impl DbPool for SpannerDbPool { - async fn get<'a>(&'a self) -> ApiResult>> { + async fn get<'a>(&'a self) -> Result>> { let mut metrics = self.metrics.clone(); metrics.start_timer("storage.spanner.get_pool", None); @@ -164,7 +163,6 @@ impl CollectionCache { (names, missing) } - #[cfg(test)] pub async fn clear(&self) { self.by_name.write().await.clear(); self.by_id.write().await.clear(); diff --git a/src/db/spanner/schema.ddl b/syncstorage/src/db/spanner/schema.ddl similarity index 100% rename from src/db/spanner/schema.ddl rename to syncstorage/src/db/spanner/schema.ddl diff --git a/src/db/spanner/support.rs b/syncstorage/src/db/spanner/support.rs similarity index 98% rename from src/db/spanner/support.rs rename to syncstorage/src/db/spanner/support.rs index 7cb777d5e3..01d7d37adf 100644 --- a/src/db/spanner/support.rs +++ b/syncstorage/src/db/spanner/support.rs @@ -15,13 +15,12 @@ use protobuf::{ well_known_types::{ListValue, NullValue, Struct, Value}, RepeatedField, }; - -use crate::{ - db::{ - params, results, spanner::models::DEFAULT_BSO_TTL, util::to_rfc3339, util::SyncTimestamp, - DbError, DbErrorKind, - }, - web::extractors::HawkIdentifier, +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, results, + util::to_rfc3339, + util::SyncTimestamp, + UserIdentifier, DEFAULT_BSO_TTL, }; use super::{models::Result, pool::Conn}; @@ -383,7 +382,7 @@ pub fn bso_from_row(mut row: Vec) -> Result { } pub fn bso_to_insert_row( - user_id: &HawkIdentifier, + user_id: &UserIdentifier, collection_id: i32, bso: params::PostCollectionBso, now: SyncTimestamp, @@ -410,7 +409,7 @@ pub fn bso_to_insert_row( } pub fn bso_to_update_row( - user_id: &HawkIdentifier, + user_id: &UserIdentifier, collection_id: i32, bso: params::PostCollectionBso, now: SyncTimestamp, diff --git a/src/db/tests/batch.rs b/syncstorage/src/db/tests/batch.rs similarity index 96% rename from src/db/tests/batch.rs rename to syncstorage/src/db/tests/batch.rs index 12ebb0d001..795f0f5825 100644 --- a/src/db/tests/batch.rs +++ b/syncstorage/src/db/tests/batch.rs @@ -1,10 +1,7 @@ use log::debug; +use syncstorage_db_common::{params, results, util::SyncTimestamp, BATCH_LIFETIME}; use super::support::{db_pool, gbso, hid, pbso, postbso, test_db, Result}; -use crate::{ - db::{error::DbErrorKind, params, results, util::SyncTimestamp, BATCH_LIFETIME}, - error::ApiErrorKind, -}; fn cb(user_id: u32, coll: &str, bsos: Vec) -> params::CreateBatch { params::CreateBatch { @@ -86,10 +83,7 @@ async fn expiry() -> Result<()> { let bsos = vec![postbso("b0", Some("payload 0"), Some(10), None)]; let result = db.append_to_batch(ab(uid, coll, new_batch, bsos)).await; - let is_batch_not_found = match result.unwrap_err().kind() { - ApiErrorKind::Db(dbe) => matches!(dbe.kind(), DbErrorKind::BatchNotFound), - _ => false, - }; + let is_batch_not_found = result.unwrap_err().is_batch_not_found(); assert!(is_batch_not_found, "Expected BatchNotFound"); Ok(()) } diff --git a/src/db/tests/db.rs b/syncstorage/src/db/tests/db.rs similarity index 99% rename from src/db/tests/db.rs rename to syncstorage/src/db/tests/db.rs index 730eb5a1c9..0f811d7fd8 100644 --- a/src/db/tests/db.rs +++ b/syncstorage/src/db/tests/db.rs @@ -3,11 +3,12 @@ use std::collections::HashMap; use lazy_static::lazy_static; use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use syncstorage_db_common::{ + params, util::SyncTimestamp, Sorting, UserIdentifier, DEFAULT_BSO_TTL, +}; use super::support::{db_pool, dbso, dbsos, gbso, gbsos, hid, pbso, postbso, test_db, Result}; -use crate::db::{mysql::models::DEFAULT_BSO_TTL, params, util::SyncTimestamp, Sorting}; use crate::settings::test_settings; -use crate::web::extractors::HawkIdentifier; // distant future (year 2099) timestamp for tests const MAX_TIMESTAMP: u64 = 4_070_937_600_000; @@ -466,7 +467,7 @@ async fn delete_collection() -> Result<()> { let uid = *UID; let coll = "NewCollection"; - for bid in 1..=3 { + for bid in 1u8..=3 { db.put_bso(pbso(uid, coll, &bid.to_string(), Some("test"), None, None)) .await?; } @@ -480,7 +481,7 @@ async fn delete_collection() -> Result<()> { assert_eq!(ts2, ts); // make sure BSOs are deleted - for bid in 1..=3 { + for bid in 1u8..=3 { let result = db.get_bso(gbso(uid, coll, &bid.to_string())).await?; assert!(result.is_none()); } @@ -507,7 +508,7 @@ async fn delete_collection_tombstone() -> Result<()> { let ts1 = with_delta!(db, -100, { db.put_bso(pbso(uid, coll, bid1, Some("test"), None, None)) .await?; - for bid2 in 1..=3 { + for bid2 in 1u8..=3 { db.put_bso(pbso( uid, "test2", @@ -545,7 +546,7 @@ async fn delete_collection_tombstone() -> Result<()> { assert_eq!(ts2, ts_storage); // make sure coll2 BSOs were deleted - for bid2 in 1..=3 { + for bid2 in 1u8..=3 { let result = db.get_bso(gbso(uid, coll2, &bid2.to_string())).await?; assert!(result.is_none()); } @@ -646,7 +647,7 @@ async fn get_collection_usage() -> Result<()> { let collection_id = db.get_collection_id("bookmarks".to_owned()).await?; let quota = db .get_quota_usage(params::GetQuotaUsage { - user_id: HawkIdentifier::new_legacy(uid as u64), + user_id: UserIdentifier::new_legacy(uid as u64), collection: "ignored".to_owned(), collection_id, }) diff --git a/src/db/tests/mod.rs b/syncstorage/src/db/tests/mod.rs similarity index 100% rename from src/db/tests/mod.rs rename to syncstorage/src/db/tests/mod.rs diff --git a/src/db/tests/support.rs b/syncstorage/src/db/tests/support.rs similarity index 82% rename from src/db/tests/support.rs rename to syncstorage/src/db/tests/support.rs index 135044b91b..71d1bd4702 100644 --- a/src/db/tests/support.rs +++ b/syncstorage/src/db/tests/support.rs @@ -1,13 +1,14 @@ use std::str::FromStr; +use syncstorage_db_common::{params, util::SyncTimestamp, Db, Sorting, UserIdentifier}; + use crate::db::DbPool; use crate::error::ApiResult; use crate::{ - db::{params, pool_from_settings, util::SyncTimestamp, Db, Sorting}, + db::pool_from_settings, error::ApiError, server::metrics, settings::{test_settings, Settings}, - web::extractors::{BsoQueryParams, HawkIdentifier, Offset}, }; pub type Result = std::result::Result; @@ -59,7 +60,7 @@ pub fn pbso( ttl: Option, ) -> params::PutBso { params::PutBso { - user_id: HawkIdentifier::new_legacy(u64::from(user_id)), + user_id: UserIdentifier::new_legacy(u64::from(user_id)), collection: coll.to_owned(), id: bid.to_owned(), payload: payload.map(|payload| payload.to_owned()), @@ -104,15 +105,13 @@ pub fn gbsos( params::GetBsos { user_id: hid(user_id), collection: coll.to_owned(), - params: BsoQueryParams { - ids: bids.iter().map(|id| id.to_owned().into()).collect(), - older: Some(SyncTimestamp::from_milliseconds(older)), - newer: Some(SyncTimestamp::from_milliseconds(newer)), - sort, - limit: Some(limit as u32), - offset: Some(Offset::from_str(offset).unwrap_or_default()), - full: true, - }, + ids: bids.iter().map(|id| id.to_owned().into()).collect(), + older: Some(SyncTimestamp::from_milliseconds(older)), + newer: Some(SyncTimestamp::from_milliseconds(newer)), + sort, + limit: Some(limit as u32), + offset: Some(params::Offset::from_str(offset).unwrap_or_default()), + full: true, } } @@ -132,6 +131,6 @@ pub fn dbsos(user_id: u32, coll: &str, bids: &[&str]) -> params::DeleteBsos { } } -pub fn hid(user_id: u32) -> HawkIdentifier { - HawkIdentifier::new_legacy(u64::from(user_id)) +pub fn hid(user_id: u32) -> UserIdentifier { + UserIdentifier::new_legacy(u64::from(user_id)) } diff --git a/src/db/transaction.rs b/syncstorage/src/db/transaction.rs similarity index 94% rename from src/db/transaction.rs rename to syncstorage/src/db/transaction.rs index 7975e532b4..9763d0e680 100644 --- a/src/db/transaction.rs +++ b/syncstorage/src/db/transaction.rs @@ -1,14 +1,5 @@ -use crate::db::results::ConnectionInfo; -use crate::db::{params, Db, DbPool}; -use crate::error::{ApiError, ApiErrorKind}; -use crate::server::metrics::Metrics; -use crate::server::ServerState; -use crate::web::extractors::{ - BsoParam, CollectionParam, HawkIdentifier, PreConditionHeader, PreConditionHeaderOpt, -}; -use crate::web::middleware::SyncServerRequest; -use crate::web::tags::Tags; -use crate::web::X_LAST_MODIFIED; +use std::cell::RefMut; +use std::future::Future; use actix_http::http::{HeaderValue, Method, StatusCode}; use actix_http::{Error, Extensions}; @@ -18,14 +9,24 @@ use actix_web::web::Data; use actix_web::{FromRequest, HttpRequest, HttpResponse}; use futures::future::LocalBoxFuture; use futures::FutureExt; -use std::cell::RefMut; -use std::future::Future; +use syncstorage_db_common::{params, Db, DbPool, UserIdentifier}; + +use crate::db::results::ConnectionInfo; +use crate::error::{ApiError, ApiErrorKind}; +use crate::server::metrics::Metrics; +use crate::server::ServerState; +use crate::web::extractors::{ + BsoParam, CollectionParam, PreConditionHeader, PreConditionHeaderOpt, +}; +use crate::web::middleware::SyncServerRequest; +use crate::web::tags::Tags; +use crate::web::X_LAST_MODIFIED; #[derive(Clone)] pub struct DbTransactionPool { pool: Box, is_read: bool, - user_id: HawkIdentifier, + user_id: UserIdentifier, collection: Option, bso_opt: Option, precondition: PreConditionHeaderOpt, @@ -54,10 +55,10 @@ impl DbTransactionPool { &'a self, request: HttpRequest, action: A, - ) -> Result<(R, Box>), Error> + ) -> Result<(R, Box>), ApiError> where A: FnOnce(Box>) -> F, - F: Future> + 'a, + F: Future> + 'a, { // Get connection from pool let db = self.pool.get().await?; @@ -100,10 +101,10 @@ impl DbTransactionPool { &'a self, request: HttpRequest, action: A, - ) -> Result + ) -> Result where A: FnOnce(Box>) -> F, - F: Future> + 'a, + F: Future> + 'a, { let (resp, db) = self.transaction_internal(request, action).await?; @@ -118,10 +119,10 @@ impl DbTransactionPool { &'a self, request: HttpRequest, action: A, - ) -> Result + ) -> Result where A: FnOnce(Box>) -> F, - F: Future> + 'a, + F: Future> + 'a, { let mreq = request.clone(); let check_precondition = move |db: Box>| { @@ -255,7 +256,7 @@ impl FromRequest for DbTransactionPool { let pool = Self { pool: state.db_pool.clone(), is_read, - user_id, + user_id: user_id.into(), collection, bso_opt, precondition, diff --git a/src/error.rs b/syncstorage/src/error.rs similarity index 66% rename from src/error.rs rename to syncstorage/src/error.rs index 458442737c..11c402e5a5 100644 --- a/src/error.rs +++ b/syncstorage/src/error.rs @@ -21,18 +21,18 @@ use serde::{ Serialize, }; +use syncstorage_common::{from_error, impl_fmt_display}; +use syncstorage_db_common::error::DbError; use thiserror::Error; -use crate::db::error::{DbError, DbErrorKind}; -use crate::web::error::{HawkError, ValidationError, ValidationErrorKind}; -use crate::web::extractors::RequestErrorLocation; +use crate::web::error::{HawkError, ValidationError}; use std::error::Error; /// Legacy Sync 1.1 error codes, which Sync 1.5 also returns by replacing the descriptive JSON /// information and replacing it with one of these error codes. #[allow(dead_code)] #[derive(Serialize)] -enum WeaveError { +pub enum WeaveError { /// Unknown error UnknownError = 0, /// Illegal method/protocol @@ -96,91 +96,19 @@ impl ApiErrorKind { } impl ApiError { - pub fn kind(&self) -> &ApiErrorKind { - &self.kind - } - - pub fn is_collection_not_found(&self) -> bool { - match self.kind() { - ApiErrorKind::Db(dbe) => match dbe.kind() { - DbErrorKind::CollectionNotFound => return true, - _ => (), - }, - _ => (), - } - false - } - - pub fn is_bso_not_found(&self) -> bool { - match self.kind() { - ApiErrorKind::Db(dbe) => match dbe.kind() { - DbErrorKind::BsoNotFound => return true, - _ => (), - }, - _ => (), - } - false - } - - pub fn is_conflict(&self) -> bool { - // Is this error a record conflict? - match self.kind() { - ApiErrorKind::Db(dbe) => match dbe.kind() { - DbErrorKind::Conflict => return true, - _ => (), - }, - _ => (), - } - false - } - pub fn is_reportable(&self) -> bool { // Should we report this error to sentry? self.status.is_server_error() - && match self.kind() { + && match &self.kind { ApiErrorKind::Db(dbe) => dbe.is_reportable(), - _ => self.kind().metric_label().is_none(), + _ => self.kind.metric_label().is_none(), } } fn weave_error_code(&self) -> WeaveError { - match self.kind() { - ApiErrorKind::Validation(ver) => match ver.kind() { - ValidationErrorKind::FromDetails( - ref description, - ref location, - name, - ref _metric_label, - ) => { - match description.as_ref() { - "over-quota" => return WeaveError::OverQuota, - "size-limit-exceeded" => return WeaveError::SizeLimitExceeded, - _ => {} - } - let name = name.clone().unwrap_or_else(|| "".to_owned()); - if *location == RequestErrorLocation::Body - && ["bso", "bsos"].contains(&name.as_str()) - { - return WeaveError::InvalidWbo; - } - WeaveError::UnknownError - } - ValidationErrorKind::FromValidationErrors( - ref _err, - ref location, - _metric_label, - ) => { - if *location == RequestErrorLocation::Body { - WeaveError::InvalidWbo - } else { - WeaveError::UnknownError - } - } - }, - ApiErrorKind::Db(dber) => match dber.kind() { - DbErrorKind::Quota => WeaveError::OverQuota, - _ => WeaveError::UnknownError, - }, + match &self.kind { + ApiErrorKind::Validation(ver) => ver.weave_error_code(), + ApiErrorKind::Db(dber) if dber.is_quota() => WeaveError::OverQuota, _ => WeaveError::UnknownError, } } @@ -199,18 +127,28 @@ impl ApiError { ))) } } -} -impl From> for ApiError { - fn from(inner: actix_web::error::BlockingError) -> Self { - match inner { - actix_web::error::BlockingError::Error(e) => e, - actix_web::error::BlockingError::Canceled => { - ApiErrorKind::Internal("Db threadpool operation canceled".to_owned()).into() - } - } + pub fn is_collection_not_found(&self) -> bool { + matches!(&self.kind, ApiErrorKind::Db(dbe) if dbe.is_collection_not_found()) + } + + pub fn is_conflict(&self) -> bool { + matches!(&self.kind, ApiErrorKind::Db(dbe) if dbe.is_conflict()) + } + + pub fn is_quota(&self) -> bool { + matches!(&self.kind, ApiErrorKind::Db(dbe) if dbe.is_quota()) + } + + pub fn is_bso_not_found(&self) -> bool { + matches!(&self.kind, ApiErrorKind::Db(dbe) if dbe.is_bso_not_found()) + } + + pub fn metric_label(&self) -> Option { + self.kind.metric_label() } } + impl Error for ApiError { fn source(&self) -> Option<&(dyn Error + 'static)> { self.kind.source() @@ -322,34 +260,17 @@ where seq.end() } -macro_rules! impl_fmt_display { - ($error:ty, $kind:ty) => { - impl fmt::Display for $error { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.kind, formatter) - } - } - }; -} - impl_fmt_display!(ApiError, ApiErrorKind); -macro_rules! from_error { - ($from:ty, $to:ty, $to_kind:expr) => { - impl From<$from> for $to { - fn from(inner: $from) -> $to { - $to_kind(inner).into() - } +impl From for ApiError { + fn from(db_error: DbError) -> Self { + Self { + status: db_error.status, + backtrace: db_error.backtrace.clone(), + kind: ApiErrorKind::Db(db_error), } - }; + } } -from_error!(DbError, ApiError, ApiErrorKind::Db); from_error!(HawkError, ApiError, ApiErrorKind::Hawk); from_error!(ValidationError, ApiError, ApiErrorKind::Validation); - -macro_rules! label { - ($string:expr) => { - Some($string.to_string()) - }; -} diff --git a/src/lib.rs b/syncstorage/src/lib.rs similarity index 100% rename from src/lib.rs rename to syncstorage/src/lib.rs diff --git a/src/logging.rs b/syncstorage/src/logging.rs similarity index 100% rename from src/logging.rs rename to syncstorage/src/logging.rs diff --git a/src/main.rs b/syncstorage/src/main.rs similarity index 100% rename from src/main.rs rename to syncstorage/src/main.rs diff --git a/src/server/metrics.rs b/syncstorage/src/server/metrics.rs similarity index 100% rename from src/server/metrics.rs rename to syncstorage/src/server/metrics.rs diff --git a/src/server/mod.rs b/syncstorage/src/server/mod.rs similarity index 99% rename from src/server/mod.rs rename to syncstorage/src/server/mod.rs index cfd50b035b..b897db2342 100644 --- a/src/server/mod.rs +++ b/syncstorage/src/server/mod.rs @@ -7,9 +7,10 @@ use actix_web::{ App, HttpRequest, HttpResponse, HttpServer, }; use cadence::StatsdClient; +use syncstorage_db_common::DbPool; use tokio::sync::RwLock; -use crate::db::{pool_from_settings, spawn_pool_periodic_reporter, DbPool}; +use crate::db::{pool_from_settings, spawn_pool_periodic_reporter}; use crate::error::ApiError; use crate::server::metrics::Metrics; use crate::settings::{Deadman, ServerLimits, Settings}; diff --git a/src/server/test.rs b/syncstorage/src/server/test.rs similarity index 99% rename from src/server/test.rs rename to syncstorage/src/server/test.rs index 6da66c9650..a550eee372 100644 --- a/src/server/test.rs +++ b/syncstorage/src/server/test.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::str::FromStr; use actix_web::{ dev::Service, @@ -15,14 +16,15 @@ use rand::{thread_rng, Rng}; use serde::de::DeserializeOwned; use serde_json::json; use sha2::Sha256; -use std::str::FromStr; +use syncstorage_db_common::{ + params, + results::{DeleteBso, GetBso, PostBsos, PutBso}, + util::SyncTimestamp, +}; use super::*; use crate::build_app; -use crate::db::params; use crate::db::pool_from_settings; -use crate::db::results::{DeleteBso, GetBso, PostBsos, PutBso}; -use crate::db::util::SyncTimestamp; use crate::settings::{test_settings, Secrets, ServerLimits}; use crate::tokenserver; use crate::web::{auth::HawkPayload, extractors::BsoBody, X_LAST_MODIFIED}; diff --git a/src/server/user_agent.rs b/syncstorage/src/server/user_agent.rs similarity index 100% rename from src/server/user_agent.rs rename to syncstorage/src/server/user_agent.rs diff --git a/src/settings.rs b/syncstorage/src/settings.rs similarity index 100% rename from src/settings.rs rename to syncstorage/src/settings.rs diff --git a/src/tokenserver/README.md b/syncstorage/src/tokenserver/README.md similarity index 100% rename from src/tokenserver/README.md rename to syncstorage/src/tokenserver/README.md diff --git a/src/tokenserver/auth/browserid.rs b/syncstorage/src/tokenserver/auth/browserid.rs similarity index 99% rename from src/tokenserver/auth/browserid.rs rename to syncstorage/src/tokenserver/auth/browserid.rs index f235270144..ce43c64365 100644 --- a/src/tokenserver/auth/browserid.rs +++ b/syncstorage/src/tokenserver/auth/browserid.rs @@ -1,12 +1,10 @@ use async_trait::async_trait; use reqwest::{Client as ReqwestClient, StatusCode}; use serde::{de::Deserializer, Deserialize, Serialize}; +use tokenserver_common::error::{ErrorLocation, TokenserverError}; use super::VerifyToken; -use crate::tokenserver::{ - error::{ErrorLocation, TokenserverError}, - settings::Settings, -}; +use crate::tokenserver::settings::Settings; use std::{convert::TryFrom, time::Duration}; diff --git a/src/tokenserver/auth/mod.rs b/syncstorage/src/tokenserver/auth/mod.rs similarity index 98% rename from src/tokenserver/auth/mod.rs rename to syncstorage/src/tokenserver/auth/mod.rs index 2e623252de..ed6e23d466 100644 --- a/src/tokenserver/auth/mod.rs +++ b/syncstorage/src/tokenserver/auth/mod.rs @@ -11,8 +11,8 @@ use pyo3::{ types::IntoPyDict, }; use serde::{Deserialize, Serialize}; +use tokenserver_common::error::TokenserverError; -use super::error::TokenserverError; use crate::error::{ApiError, ApiErrorKind}; /// Represents the origin of the token used by Sync clients to access their data. diff --git a/src/tokenserver/auth/oauth.rs b/syncstorage/src/tokenserver/auth/oauth.rs similarity index 84% rename from src/tokenserver/auth/oauth.rs rename to syncstorage/src/tokenserver/auth/oauth.rs index ddf21fb9f6..78f2e33514 100644 --- a/src/tokenserver/auth/oauth.rs +++ b/syncstorage/src/tokenserver/auth/oauth.rs @@ -1,15 +1,17 @@ -use actix_web::{web, Error}; +use actix_web::Error; use async_trait::async_trait; +use futures::TryFutureExt; use pyo3::{ prelude::{Py, PyAny, PyErr, PyModule, Python}, types::{IntoPyDict, PyString}, }; use serde::{Deserialize, Serialize}; use serde_json; -use tokio::time; +use tokenserver_common::error::TokenserverError; +use tokio::{task, time}; use super::VerifyToken; -use crate::tokenserver::{error::TokenserverError, settings::Settings}; +use crate::tokenserver::settings::Settings; use core::time::Duration; use std::convert::TryFrom; @@ -72,7 +74,7 @@ impl VerifyToken for RemoteVerifier { async fn verify(&self, token: String) -> Result { let verifier = self.clone(); - let fut = web::block(move || { + let fut = task::spawn_blocking(move || { let maybe_verify_output_string = Python::with_gil(|py| { let client = verifier.inner.as_ref(py); // `client.verify_token(token)` @@ -110,6 +112,20 @@ impl VerifyToken for RemoteVerifier { ..TokenserverError::invalid_credentials("Unauthorized") }), } + }) + .map_err(|err| { + let context = if err.is_cancelled() { + "Tokenserver threadpool operation cancelled" + } else if err.is_panic() { + "Tokenserver threadpool operation panicked" + } else { + "Tokenserver threadpool operation failed for unknown reason" + }; + + TokenserverError { + context: context.to_owned(), + ..TokenserverError::internal_error() + } }); // The PyFxA OAuth client does not offer a way to set a request timeout, so we set one here @@ -121,6 +137,6 @@ impl VerifyToken for RemoteVerifier { context: "OAuth verification timeout".to_owned(), ..TokenserverError::resource_unavailable() })? - .map_err(Into::into) + .map_err(|_| TokenserverError::resource_unavailable())? } } diff --git a/src/tokenserver/auth/verify.py b/syncstorage/src/tokenserver/auth/verify.py similarity index 100% rename from src/tokenserver/auth/verify.py rename to syncstorage/src/tokenserver/auth/verify.py diff --git a/src/tokenserver/db/mock.rs b/syncstorage/src/tokenserver/db/mock.rs similarity index 98% rename from src/tokenserver/db/mock.rs rename to syncstorage/src/tokenserver/db/mock.rs index d702b05ecb..ae6441074f 100644 --- a/src/tokenserver/db/mock.rs +++ b/syncstorage/src/tokenserver/db/mock.rs @@ -2,12 +2,12 @@ use async_trait::async_trait; use futures::future; +use syncstorage_db_common::{error::DbError, GetPoolState, PoolState}; use super::models::{Db, DbFuture}; use super::params; use super::pool::DbPool; use super::results; -use crate::db::{error::DbError, GetPoolState, PoolState}; #[derive(Clone, Debug)] pub struct MockDbPool; diff --git a/src/tokenserver/db/mod.rs b/syncstorage/src/tokenserver/db/mod.rs similarity index 100% rename from src/tokenserver/db/mod.rs rename to syncstorage/src/tokenserver/db/mod.rs diff --git a/src/tokenserver/db/models.rs b/syncstorage/src/tokenserver/db/models.rs similarity index 98% rename from src/tokenserver/db/models.rs rename to syncstorage/src/tokenserver/db/models.rs index ff8163d538..19fa862d05 100644 --- a/src/tokenserver/db/models.rs +++ b/syncstorage/src/tokenserver/db/models.rs @@ -1,4 +1,4 @@ -use actix_web::{http::StatusCode, web::block}; +use actix_web::http::StatusCode; use diesel::{ mysql::MysqlConnection, r2d2::{ConnectionManager, PooledConnection}, @@ -8,7 +8,7 @@ use diesel::{ #[cfg(test)] use diesel_logger::LoggingConnection; use futures::future::LocalBoxFuture; -use futures::TryFutureExt; +use syncstorage_db_common::error::DbError; use std::{ result, @@ -17,8 +17,7 @@ use std::{ }; use super::{params, results}; -use crate::db::error::{DbError, DbErrorKind}; -use crate::error::ApiError; +use crate::db; use crate::server::metrics::Metrics; use crate::sync_db_method; @@ -26,7 +25,7 @@ use crate::sync_db_method; /// "retired" from the db. const MAX_GENERATION: i64 = i64::MAX; -pub type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; +pub type DbFuture<'a, T> = LocalBoxFuture<'a, Result>; pub type DbResult = result::Result; type Conn = PooledConnection>; @@ -246,7 +245,7 @@ impl TokenserverDb { } } - let mut db_error: DbError = DbErrorKind::Internal("unable to get a node".to_owned()).into(); + let mut db_error = DbError::internal("unable to get a node"); db_error.status = StatusCode::SERVICE_UNAVAILABLE; Err(db_error) } @@ -398,8 +397,10 @@ impl TokenserverDb { first_seen_at, old_client_states, }), - // The most up-to-date user doesn't have a node and is retired. - (_, None) => Err(DbError::from(DbErrorKind::TokenserverUserRetired)), + // The most up-to-date user doesn't have a node and is retired. This is an internal + // service error for compatibility reasons (the legacy Tokenserver returned an + // internal service error in this situation). + (_, None) => Err(DbError::internal("Tokenserver user retired")), } } } @@ -616,7 +617,7 @@ impl Db for TokenserverDb { fn check(&self) -> DbFuture<'_, results::Check> { let db = self.clone(); - Box::pin(block(move || db.check_sync().map_err(Into::into)).map_err(Into::into)) + Box::pin(db::run_on_blocking_threadpool(move || db.check_sync())) } #[cfg(test)] @@ -719,10 +720,8 @@ mod tests { use crate::settings::test_settings; use crate::tokenserver::db::pool::{DbPool, TokenserverPool}; - type Result = std::result::Result; - #[tokio::test] - async fn test_update_generation() -> Result<()> { + async fn test_update_generation() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -796,7 +795,7 @@ mod tests { } #[tokio::test] - async fn test_update_keys_changed_at() -> Result<()> { + async fn test_update_keys_changed_at() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -870,7 +869,7 @@ mod tests { } #[tokio::test] - async fn replace_users() -> Result<()> { + async fn replace_users() -> DbResult<()> { const MILLISECONDS_IN_A_MINUTE: i64 = 60 * 1000; const MILLISECONDS_IN_AN_HOUR: i64 = MILLISECONDS_IN_A_MINUTE * 60; @@ -1054,7 +1053,7 @@ mod tests { } #[tokio::test] - async fn post_user() -> Result<()> { + async fn post_user() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1120,7 +1119,7 @@ mod tests { } #[tokio::test] - async fn get_node_id() -> Result<()> { + async fn get_node_id() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1167,7 +1166,7 @@ mod tests { } #[tokio::test] - async fn test_node_allocation() -> Result<()> { + async fn test_node_allocation() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get_tokenserver_db().await?; @@ -1212,7 +1211,7 @@ mod tests { } #[tokio::test] - async fn test_allocation_to_least_loaded_node() -> Result<()> { + async fn test_allocation_to_least_loaded_node() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get_tokenserver_db().await?; @@ -1273,7 +1272,7 @@ mod tests { } #[tokio::test] - async fn test_allocation_is_not_allowed_to_downed_nodes() -> Result<()> { + async fn test_allocation_is_not_allowed_to_downed_nodes() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get_tokenserver_db().await?; @@ -1314,7 +1313,7 @@ mod tests { } #[tokio::test] - async fn test_allocation_is_not_allowed_to_backoff_nodes() -> Result<()> { + async fn test_allocation_is_not_allowed_to_backoff_nodes() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get_tokenserver_db().await?; @@ -1355,7 +1354,7 @@ mod tests { } #[tokio::test] - async fn test_node_reassignment_when_records_are_replaced() -> Result<()> { + async fn test_node_reassignment_when_records_are_replaced() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get_tokenserver_db().await?; @@ -1427,7 +1426,7 @@ mod tests { } #[tokio::test] - async fn test_node_reassignment_not_done_for_retired_users() -> Result<()> { + async fn test_node_reassignment_not_done_for_retired_users() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1483,7 +1482,7 @@ mod tests { } #[tokio::test] - async fn test_node_reassignment_and_removal() -> Result<()> { + async fn test_node_reassignment_and_removal() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1634,7 +1633,7 @@ mod tests { } #[tokio::test] - async fn test_gradual_release_of_node_capacity() -> Result<()> { + async fn test_gradual_release_of_node_capacity() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1800,7 +1799,7 @@ mod tests { } #[tokio::test] - async fn test_correct_created_at_used_during_node_reassignment() -> Result<()> { + async fn test_correct_created_at_used_during_node_reassignment() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; @@ -1864,7 +1863,7 @@ mod tests { } #[tokio::test] - async fn test_correct_created_at_used_during_user_retrieval() -> Result<()> { + async fn test_correct_created_at_used_during_user_retrieval() -> DbResult<()> { let pool = db_pool().await?; let db = pool.get().await?; diff --git a/src/tokenserver/db/params.rs b/syncstorage/src/tokenserver/db/params.rs similarity index 100% rename from src/tokenserver/db/params.rs rename to syncstorage/src/tokenserver/db/params.rs diff --git a/src/tokenserver/db/pool.rs b/syncstorage/src/tokenserver/db/pool.rs similarity index 83% rename from src/tokenserver/db/pool.rs rename to syncstorage/src/tokenserver/db/pool.rs index a4cf97db19..9cfda64e88 100644 --- a/src/tokenserver/db/pool.rs +++ b/syncstorage/src/tokenserver/db/pool.rs @@ -1,4 +1,3 @@ -use actix_web::web::block; use async_trait::async_trait; use diesel::{ mysql::MysqlConnection, @@ -6,9 +5,10 @@ use diesel::{ }; use diesel_logger::LoggingConnection; use std::time::Duration; +use syncstorage_db_common::{error::DbError, GetPoolState, PoolState}; use super::models::{Db, DbResult, TokenserverDb}; -use crate::db::{error::DbError, DbErrorKind, GetPoolState, PoolState}; +use crate::db; use crate::diesel::Connection; use crate::server::metrics::Metrics; use crate::tokenserver::settings::Settings; @@ -77,23 +77,13 @@ impl TokenserverPool { #[cfg(test)] pub async fn get_tokenserver_db(&self) -> Result { let pool = self.clone(); - let conn = block(move || pool.inner.get().map_err(DbError::from)).await?; + let conn = + db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?; Ok(TokenserverDb::new(conn, &self.metrics)) } } -impl From> for DbError { - fn from(inner: actix_web::error::BlockingError) -> Self { - match inner { - actix_web::error::BlockingError::Error(e) => e, - actix_web::error::BlockingError::Canceled => { - DbErrorKind::Internal("Db threadpool operation canceled".to_owned()).into() - } - } - } -} - #[async_trait] impl DbPool for TokenserverPool { async fn get(&self) -> Result, DbError> { @@ -101,7 +91,8 @@ impl DbPool for TokenserverPool { metrics.start_timer("storage.get_pool", None); let pool = self.clone(); - let conn = block(move || pool.inner.get().map_err(DbError::from)).await?; + let conn = + db::run_on_blocking_threadpool(move || pool.inner.get().map_err(DbError::from)).await?; Ok(Box::new(TokenserverDb::new(conn, &self.metrics)) as Box) } diff --git a/src/tokenserver/db/results.rs b/syncstorage/src/tokenserver/db/results.rs similarity index 100% rename from src/tokenserver/db/results.rs rename to syncstorage/src/tokenserver/db/results.rs diff --git a/src/tokenserver/extractors.rs b/syncstorage/src/tokenserver/extractors.rs similarity index 99% rename from src/tokenserver/extractors.rs rename to syncstorage/src/tokenserver/extractors.rs index 095e1b7867..c77ee70faf 100644 --- a/src/tokenserver/extractors.rs +++ b/syncstorage/src/tokenserver/extractors.rs @@ -19,13 +19,13 @@ use lazy_static::lazy_static; use regex::Regex; use serde::Deserialize; use sha2::Sha256; +use tokenserver_common::error::{ErrorLocation, TokenserverError}; use super::{ db::{models::Db, params, pool::DbPool, results}, - error::{ErrorLocation, TokenserverError}, LogItemsMutator, NodeType, ServerState, TokenserverMetrics, }; -use crate::{server::metrics, settings::Secrets}; +use crate::{error::ApiError, server::metrics, settings::Secrets}; lazy_static! { static ref CLIENT_STATE_REGEX: Regex = Regex::new("^[a-zA-Z0-9._-]{1,32}$").unwrap(); @@ -202,7 +202,8 @@ impl FromRequest for TokenserverRequest { db.get_service_id(params::GetServiceId { service: SYNC_SERVICE_NAME.to_owned(), }) - .await? + .await + .map_err(ApiError::from)? .id, ) } else { @@ -234,7 +235,8 @@ impl FromRequest for TokenserverRequest { keys_changed_at: auth_data.keys_changed_at, capacity_release_rate: state.node_capacity_release_rate, }) - .await?; + .await + .map_err(ApiError::from)?; log_items_mutator.insert("first_seen_at".to_owned(), user.first_seen_at.to_string()); let duration = { diff --git a/src/tokenserver/handlers.rs b/syncstorage/src/tokenserver/handlers.rs similarity index 98% rename from src/tokenserver/handlers.rs rename to syncstorage/src/tokenserver/handlers.rs index 61beeeaf9b..7a8e0fb871 100644 --- a/src/tokenserver/handlers.rs +++ b/syncstorage/src/tokenserver/handlers.rs @@ -7,6 +7,7 @@ use std::{ use actix_web::{http::StatusCode, Error, HttpResponse}; use serde::Serialize; use serde_json::Value; +use tokenserver_common::error::TokenserverError; use super::{ auth::{MakeTokenPlaintext, Tokenlib, TokenserverOrigin}, @@ -14,10 +15,10 @@ use super::{ models::Db, params::{GetNodeId, PostUser, PutUser, ReplaceUsers}, }, - error::TokenserverError, extractors::TokenserverRequest, NodeType, TokenserverMetrics, }; +use crate::error::ApiError; #[derive(Debug, Serialize)] pub struct TokenserverResult { @@ -112,7 +113,7 @@ struct UserUpdates { uid: i64, } -async fn update_user(req: &TokenserverRequest, db: Box) -> Result { +async fn update_user(req: &TokenserverRequest, db: Box) -> Result { // If the keys_changed_at in the request is larger than that stored on the user record, // update to the value in the request. let keys_changed_at = diff --git a/src/tokenserver/logging.rs b/syncstorage/src/tokenserver/logging.rs similarity index 100% rename from src/tokenserver/logging.rs rename to syncstorage/src/tokenserver/logging.rs diff --git a/src/tokenserver/migrations/2021-07-16-001122_init/down.sql b/syncstorage/src/tokenserver/migrations/2021-07-16-001122_init/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-07-16-001122_init/down.sql rename to syncstorage/src/tokenserver/migrations/2021-07-16-001122_init/down.sql diff --git a/src/tokenserver/migrations/2021-07-16-001122_init/up.sql b/syncstorage/src/tokenserver/migrations/2021-07-16-001122_init/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-07-16-001122_init/up.sql rename to syncstorage/src/tokenserver/migrations/2021-07-16-001122_init/up.sql diff --git a/src/tokenserver/migrations/2021-08-03-234845_populate_services/down.sql b/syncstorage/src/tokenserver/migrations/2021-08-03-234845_populate_services/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-08-03-234845_populate_services/down.sql rename to syncstorage/src/tokenserver/migrations/2021-08-03-234845_populate_services/down.sql diff --git a/src/tokenserver/migrations/2021-08-03-234845_populate_services/up.sql b/syncstorage/src/tokenserver/migrations/2021-08-03-234845_populate_services/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-08-03-234845_populate_services/up.sql rename to syncstorage/src/tokenserver/migrations/2021-08-03-234845_populate_services/up.sql diff --git a/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/down.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/down.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/down.sql diff --git a/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/up.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/up.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142643_remove_foreign_key_constraints/up.sql diff --git a/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/down.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/down.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/down.sql diff --git a/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/up.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/up.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142654_remove_node_defaults/up.sql diff --git a/src/tokenserver/migrations/2021-09-30-142746_add_indexes/down.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142746_add_indexes/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142746_add_indexes/down.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142746_add_indexes/down.sql diff --git a/src/tokenserver/migrations/2021-09-30-142746_add_indexes/up.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-142746_add_indexes/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-142746_add_indexes/up.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-142746_add_indexes/up.sql diff --git a/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/down.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/down.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/down.sql diff --git a/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/up.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/up.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-144043_remove_nodes_service_key/up.sql diff --git a/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/down.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/down.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/down.sql diff --git a/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/up.sql b/syncstorage/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/up.sql rename to syncstorage/src/tokenserver/migrations/2021-09-30-144225_remove_users_nodeid_key/up.sql diff --git a/src/tokenserver/migrations/2021-12-22-160451_remove_services/down.sql b/syncstorage/src/tokenserver/migrations/2021-12-22-160451_remove_services/down.sql similarity index 100% rename from src/tokenserver/migrations/2021-12-22-160451_remove_services/down.sql rename to syncstorage/src/tokenserver/migrations/2021-12-22-160451_remove_services/down.sql diff --git a/src/tokenserver/migrations/2021-12-22-160451_remove_services/up.sql b/syncstorage/src/tokenserver/migrations/2021-12-22-160451_remove_services/up.sql similarity index 100% rename from src/tokenserver/migrations/2021-12-22-160451_remove_services/up.sql rename to syncstorage/src/tokenserver/migrations/2021-12-22-160451_remove_services/up.sql diff --git a/src/tokenserver/mod.rs b/syncstorage/src/tokenserver/mod.rs similarity index 99% rename from src/tokenserver/mod.rs rename to syncstorage/src/tokenserver/mod.rs index 84b6bed153..1a2a4b8500 100644 --- a/src/tokenserver/mod.rs +++ b/syncstorage/src/tokenserver/mod.rs @@ -1,6 +1,5 @@ pub mod auth; pub mod db; -pub mod error; pub mod extractors; pub mod handlers; pub mod logging; diff --git a/src/tokenserver/settings.rs b/syncstorage/src/tokenserver/settings.rs similarity index 100% rename from src/tokenserver/settings.rs rename to syncstorage/src/tokenserver/settings.rs diff --git a/src/web/auth.rs b/syncstorage/src/web/auth.rs similarity index 99% rename from src/web/auth.rs rename to syncstorage/src/web/auth.rs index 2fa160d355..31f160182b 100644 --- a/src/web/auth.rs +++ b/syncstorage/src/web/auth.rs @@ -24,6 +24,7 @@ use super::{ extractors::RequestErrorLocation, }; use crate::error::{ApiErrorKind, ApiResult}; +use crate::label; use crate::settings::Secrets; use crate::tokenserver::auth::TokenserverOrigin; diff --git a/src/web/error.rs b/syncstorage/src/web/error.rs similarity index 85% rename from src/web/error.rs rename to syncstorage/src/web/error.rs index 4e8ed0a918..5f64321e78 100644 --- a/src/web/error.rs +++ b/syncstorage/src/web/error.rs @@ -13,9 +13,10 @@ use serde::{ Serialize, }; use serde_json::{Error as JsonError, Value}; +use syncstorage_common::{from_error, impl_fmt_display}; use super::extractors::RequestErrorLocation; -use crate::error::ApiError; +use crate::error::{ApiError, WeaveError}; use thiserror::Error; @@ -100,12 +101,8 @@ pub struct ValidationError { } impl ValidationError { - pub fn kind(&self) -> &ValidationErrorKind { - &self.kind - } - pub fn metric_label(&self) -> Option { - match self.kind() { + match &self.kind { ValidationErrorKind::FromDetails( _description, ref _location, @@ -118,6 +115,37 @@ impl ValidationError { _ => None, } } + + pub fn weave_error_code(&self) -> WeaveError { + match &self.kind { + ValidationErrorKind::FromDetails( + ref description, + ref location, + name, + ref _metric_label, + ) => { + match description.as_ref() { + "over-quota" => return WeaveError::OverQuota, + "size-limit-exceeded" => return WeaveError::SizeLimitExceeded, + _ => {} + } + let name = name.clone().unwrap_or_else(|| "".to_owned()); + if *location == RequestErrorLocation::Body + && ["bso", "bsos"].contains(&name.as_str()) + { + return WeaveError::InvalidWbo; + } + WeaveError::UnknownError + } + ValidationErrorKind::FromValidationErrors(ref _err, ref location, _metric_label) => { + if *location == RequestErrorLocation::Body { + WeaveError::InvalidWbo + } else { + WeaveError::UnknownError + } + } + } + } } /// Causes of extractor errors. diff --git a/src/web/extractors.rs b/syncstorage/src/web/extractors.rs similarity index 97% rename from src/web/extractors.rs rename to syncstorage/src/web/extractors.rs index 5a0abfae99..9d00378f60 100644 --- a/src/web/extractors.rs +++ b/syncstorage/src/web/extractors.rs @@ -26,11 +26,16 @@ use serde::{ Deserialize, Serialize, }; use serde_json::Value; +use syncstorage_db_common::{ + params::{self, PostCollectionBso}, + util::SyncTimestamp, + DbPool, Sorting, UserIdentifier, +}; use validator::{Validate, ValidationError}; use crate::db::transaction::DbTransactionPool; -use crate::db::{util::SyncTimestamp, DbPool, Sorting}; use crate::error::{ApiError, ApiErrorKind}; +use crate::label; use crate::server::{metrics, ServerState, BSO_ID_REGEX, COLLECTION_ID_REGEX}; use crate::settings::Secrets; use crate::tokenserver::auth::TokenserverOrigin; @@ -112,6 +117,17 @@ impl BatchBsoBody { } } +impl From for PostCollectionBso { + fn from(b: BatchBsoBody) -> PostCollectionBso { + PostCollectionBso { + id: b.id, + sortindex: b.sortindex, + payload: b.payload, + ttl: b.ttl, + } + } +} + // This tries to do the right thing to get the Accepted header according to // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept, but some corners can absolutely be cut. // This will pull the first accepted content type listed, or the highest rated non-accepted type. @@ -616,7 +632,8 @@ impl FromRequest for CollectionParam { /// Only the database and user identifier is required for information /// requests: https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html#general-info pub struct MetaRequest { - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, + pub tokenserver_origin: TokenserverOrigin, pub metrics: metrics::Metrics, } @@ -633,7 +650,8 @@ impl FromRequest for MetaRequest { let user_id = HawkIdentifier::from_request(&req, &mut payload).await?; Ok(MetaRequest { - user_id, + tokenserver_origin: user_id.tokenserver_origin, + user_id: user_id.into(), metrics: metrics::Metrics::extract(&req).await?, }) } @@ -653,7 +671,8 @@ pub enum ReplyFormat { /// Extracts/validates information needed for collection delete/get requests. pub struct CollectionRequest { pub collection: String, - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, + pub tokenserver_origin: TokenserverOrigin, pub query: BsoQueryParams, pub reply: ReplyFormat, pub metrics: metrics::Metrics, @@ -693,7 +712,8 @@ impl FromRequest for CollectionRequest { Ok(CollectionRequest { collection, - user_id, + tokenserver_origin: user_id.tokenserver_origin, + user_id: user_id.into(), query, reply, metrics: metrics::Metrics::extract(&req).await?, @@ -710,7 +730,8 @@ impl FromRequest for CollectionRequest { /// Extracts/validates information needed for batch collection POST requests. pub struct CollectionPostRequest { pub collection: String, - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, + pub tokenserver_origin: TokenserverOrigin, pub query: BsoQueryParams, pub bsos: BsoBodies, pub batch: Option, @@ -788,7 +809,8 @@ impl FromRequest for CollectionPostRequest { let batch = BatchRequestOpt::extract(&req).await?; Ok(CollectionPostRequest { collection, - user_id, + tokenserver_origin: user_id.tokenserver_origin, + user_id: user_id.into(), query, bsos, batch: batch.opt, @@ -805,7 +827,8 @@ impl FromRequest for CollectionPostRequest { #[derive(Debug)] pub struct BsoRequest { pub collection: String, - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, + pub tokenserver_origin: TokenserverOrigin, pub query: BsoQueryParams, pub bso: String, pub metrics: metrics::Metrics, @@ -830,7 +853,8 @@ impl FromRequest for BsoRequest { Ok(BsoRequest { collection, - user_id, + tokenserver_origin: user_id.tokenserver_origin, + user_id: user_id.into(), query, bso: bso.bso, metrics: metrics::Metrics::extract(&req).await?, @@ -844,7 +868,8 @@ impl FromRequest for BsoRequest { /// Extracts/validates information needed for BSO put requests. pub struct BsoPutRequest { pub collection: String, - pub user_id: HawkIdentifier, + pub user_id: UserIdentifier, + pub tokenserver_origin: TokenserverOrigin, pub query: BsoQueryParams, pub bso: String, pub body: BsoBody, @@ -889,7 +914,8 @@ impl FromRequest for BsoPutRequest { } Ok(BsoPutRequest { collection, - user_id, + tokenserver_origin: user_id.tokenserver_origin, + user_id: user_id.into(), query, bso: bso.bso, body, @@ -984,14 +1010,6 @@ pub struct HawkIdentifier { } impl HawkIdentifier { - /// Create a new legacy id user identifier - pub fn new_legacy(user_id: u64) -> HawkIdentifier { - HawkIdentifier { - legacy_id: user_id, - ..Default::default() - } - } - pub fn cmd_dummy() -> Self { // Create a "dummy" HawkID for use by DockerFlow commands Self { @@ -1106,6 +1124,16 @@ impl HawkIdentifier { } } +impl From for UserIdentifier { + fn from(hawk_id: HawkIdentifier) -> Self { + Self { + legacy_id: hawk_id.legacy_id, + fxa_uid: hawk_id.fxa_uid, + fxa_kid: hawk_id.fxa_kid, + } + } +} + impl FromRequest for HawkIdentifier { type Config = (); type Error = Error; @@ -1138,32 +1166,19 @@ impl FromRequest for HawkIdentifier { } } -impl From for HawkIdentifier { - fn from(val: u32) -> Self { - HawkIdentifier { - legacy_id: val.into(), - ..Default::default() - } - } -} - -#[derive(Debug, Default, Clone, Deserialize, Validate, PartialEq)] +#[derive(Debug, Default, Clone, Copy, Deserialize, Validate, PartialEq)] #[serde(default)] pub struct Offset { pub timestamp: Option, pub offset: u64, } -impl ToString for Offset { - fn to_string(&self) -> String { - // issue559: Disable ':' support for now. - self.offset.to_string() - /* - match self.timestamp { - None => self.offset.to_string(), - Some(ts) => format!("{}:{}", ts.as_i64(), self.offset), +impl From for params::Offset { + fn from(offset: Offset) -> Self { + Self { + timestamp: offset.timestamp, + offset: offset.offset, } - */ } } @@ -1691,7 +1706,7 @@ macro_rules! impl_emit_api_metric { self.metrics.incr_with_tag( label, "tokenserver_origin", - &self.user_id.tokenserver_origin.to_string(), + &self.tokenserver_origin.to_string(), ); } } @@ -1728,12 +1743,10 @@ mod tests { use rand::{thread_rng, Rng}; use serde_json::{self, json}; use sha2::Sha256; + use syncstorage_db_common::Db; use tokio::sync::RwLock; - use crate::db::{ - mock::{MockDb, MockDbPool}, - Db, - }; + use crate::db::mock::{MockDb, MockDbPool}; use crate::server::{metrics, ServerState}; use crate::settings::{Deadman, Secrets, ServerLimits, Settings}; @@ -2379,7 +2392,7 @@ mod tests { #[actix_rt::test] async fn test_offset() { - let sample_offset = Offset { + let sample_offset = params::Offset { timestamp: Some(SyncTimestamp::default()), offset: 1234, }; diff --git a/src/web/handlers.rs b/syncstorage/src/web/handlers.rs similarity index 91% rename from src/web/handlers.rs rename to syncstorage/src/web/handlers.rs index 7a972eec97..4e7514e96a 100644 --- a/src/web/handlers.rs +++ b/syncstorage/src/web/handlers.rs @@ -2,21 +2,19 @@ use std::collections::HashMap; use std::convert::Into; -use actix_web::{ - dev::HttpResponseBuilder, http::StatusCode, web::Data, Error, HttpRequest, HttpResponse, -}; +use actix_web::{dev::HttpResponseBuilder, http::StatusCode, web::Data, HttpRequest, HttpResponse}; use serde::Serialize; use serde_json::{json, Map, Value}; +use syncstorage_db_common::{ + error::{DbError, DbErrorKind}, + params, + results::{CreateBatch, Paginated}, + Db, +}; use crate::{ - db::{ - params, - results::{CreateBatch, Paginated}, - transaction::DbTransactionPool, - util::SyncTimestamp, - Db, DbError, DbErrorKind, - }, - error::{ApiError, ApiErrorKind, ApiResult}, + db::transaction::DbTransactionPool, + error::{ApiError, ApiErrorKind}, server::ServerState, tokenserver, web::{ @@ -34,7 +32,7 @@ pub async fn get_collections( meta: MetaRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { meta.emit_api_metric("request.get_collections"); @@ -51,7 +49,7 @@ pub async fn get_collection_counts( meta: MetaRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { meta.emit_api_metric("request.get_collection_counts"); @@ -68,7 +66,7 @@ pub async fn get_collection_usage( meta: MetaRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { meta.emit_api_metric("request.get_collection_usage"); @@ -90,7 +88,7 @@ pub async fn get_quota( meta: MetaRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { meta.emit_api_metric("request.get_quota"); @@ -104,7 +102,7 @@ pub async fn delete_all( meta: MetaRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { meta.emit_api_metric("request.delete_all"); @@ -117,11 +115,11 @@ pub async fn delete_collection( coll: CollectionRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { let delete_bsos = !coll.query.ids.is_empty(); - let timestamp: ApiResult = if delete_bsos { + let timestamp = if delete_bsos { coll.emit_api_metric("request.delete_bsos"); db.delete_bsos(params::DeleteBsos { user_id: coll.user_id.clone(), @@ -163,13 +161,19 @@ pub async fn get_collection( coll: CollectionRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { coll.emit_api_metric("request.get_collection"); let params = params::GetBsos { user_id: coll.user_id.clone(), - params: coll.query.clone(), + newer: coll.query.newer, + older: coll.query.older, + sort: coll.query.sort, + limit: coll.query.limit, + offset: coll.query.offset.map(Into::into), + ids: coll.query.ids.clone(), + full: coll.query.full, collection: coll.collection.clone(), }; let response = if coll.query.full { @@ -188,8 +192,8 @@ pub async fn get_collection( async fn finish_get_collection( coll: &CollectionRequest, db: Box + '_>, - result: Result, ApiError>, -) -> Result + result: Result, DbError>, +) -> Result where T: Serialize + Default + 'static, { @@ -239,7 +243,7 @@ pub async fn post_collection( coll: CollectionPostRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { coll.emit_api_metric("request.post_collection"); @@ -279,7 +283,7 @@ pub async fn post_collection( pub async fn post_collection_batch( coll: CollectionPostRequest, db: Box + '_>, -) -> Result { +) -> Result { coll.emit_api_metric("request.post_collection_batch"); trace!("Batch: Post collection batch"); // Bail early if we have nonsensical arguments @@ -318,7 +322,7 @@ pub async fn post_collection_batch( } } else { let err: DbError = DbErrorKind::BatchNotFound.into(); - return Err(ApiError::from(err).into()); + return Err(ApiError::from(err)); } } else { trace!("Batch: Creating new batch"); @@ -340,28 +344,19 @@ pub async fn post_collection_batch( let mut resp: Value = json!({}); macro_rules! handle_result { - // collect up the successful and failed bso_ids into a response. - ( $r: expr) => { + // collect up the successful and failed bso_ids into a response. + ( $r: expr) => { match $r { Ok(_) => success.extend(bso_ids.clone()), - Err(e) if e.is_conflict() => return Err(e.into()), - Err(apperr) => { - if let ApiErrorKind::Db(dberr) = apperr.kind() { - // If we're over quota, return immediately with a 403 to let the client know. - // Otherwise the client will simply keep retrying records. - if let DbErrorKind::Quota = dberr.kind() { - return Err(apperr.into()); - } - }; - failed.extend( - bso_ids - .clone() - .into_iter() - .map(|id| (id, "db error".to_owned())), - ) - } + Err(e) if e.is_conflict() || e.is_quota() => return Err(e.into()), + _ => failed.extend( + bso_ids + .clone() + .into_iter() + .map(|id| (id, "db error".to_owned())), + ), }; - } + }; } // If we're not committing the current set of records yet. @@ -413,7 +408,7 @@ pub async fn post_collection_batch( .await? } else { let err: DbError = DbErrorKind::BatchNotFound.into(); - return Err(ApiError::from(err).into()); + return Err(ApiError::from(err)); }; // Then, write the BSOs contained in the commit request into the BSO table. @@ -462,7 +457,7 @@ pub async fn delete_bso( bso_req: BsoRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { bso_req.emit_api_metric("request.delete_bso"); @@ -482,7 +477,7 @@ pub async fn get_bso( bso_req: BsoRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { bso_req.emit_api_metric("request.get_bso"); @@ -506,7 +501,7 @@ pub async fn put_bso( bso_req: BsoPutRequest, db_pool: DbTransactionPool, request: HttpRequest, -) -> Result { +) -> Result { db_pool .transaction_http(request, |db| async move { bso_req.emit_api_metric("request.put_bso"); @@ -544,7 +539,7 @@ pub fn get_configuration(state: Data) -> HttpResponse { /** Returns a status message indicating the state of the current server * */ -pub async fn heartbeat(hb: HeartbeatRequest, req: HttpRequest) -> Result { +pub async fn heartbeat(hb: HeartbeatRequest, req: HttpRequest) -> Result { let mut checklist = HashMap::new(); checklist.insert( "version".to_owned(), @@ -625,7 +620,7 @@ pub async fn heartbeat(hb: HeartbeatRequest, req: HttpRequest) -> Result Result { +pub async fn lbheartbeat(req: HttpRequest) -> Result { let mut resp: HashMap = HashMap::new(); let state = match req.app_data::>() { @@ -639,9 +634,9 @@ pub async fn lbheartbeat(req: HttpRequest) -> Result { let deadarc = state.deadman.clone(); let mut deadman = *deadarc.read().await; let db_state = if cfg!(test) { - use crate::db::PoolState; use actix_web::http::header::HeaderValue; use std::str::FromStr; + use syncstorage_db_common::PoolState; let test_pool = PoolState { connections: u32::from_str( diff --git a/src/web/middleware/mod.rs b/syncstorage/src/web/middleware/mod.rs similarity index 98% rename from src/web/middleware/mod.rs rename to syncstorage/src/web/middleware/mod.rs index 57f729e8eb..4f4b1274d1 100644 --- a/src/web/middleware/mod.rs +++ b/syncstorage/src/web/middleware/mod.rs @@ -12,8 +12,8 @@ use actix_web::{ dev::{Service, ServiceRequest, ServiceResponse}, Error, HttpRequest, }; +use syncstorage_db_common::util::SyncTimestamp; -use crate::db::util::SyncTimestamp; use crate::error::{ApiError, ApiErrorKind}; use crate::server::{metrics::Metrics, ServerState}; use crate::settings::Secrets; diff --git a/src/web/middleware/rejectua.rs b/syncstorage/src/web/middleware/rejectua.rs similarity index 100% rename from src/web/middleware/rejectua.rs rename to syncstorage/src/web/middleware/rejectua.rs diff --git a/src/web/middleware/sentry.rs b/syncstorage/src/web/middleware/sentry.rs similarity index 98% rename from src/web/middleware/sentry.rs rename to syncstorage/src/web/middleware/sentry.rs index 0dfba167fd..4045bd3163 100644 --- a/src/web/middleware/sentry.rs +++ b/syncstorage/src/web/middleware/sentry.rs @@ -136,7 +136,7 @@ where Some(e) => { if let Some(apie) = e.as_error::() { if let Some(metrics) = metrics { - if let Some(label) = apie.kind().metric_label() { + if let Some(label) = apie.metric_label() { metrics.incr(&label); } } diff --git a/src/web/middleware/weave.rs b/syncstorage/src/web/middleware/weave.rs similarity index 98% rename from src/web/middleware/weave.rs rename to syncstorage/src/web/middleware/weave.rs index aab6a9a3c7..db2fb9328d 100644 --- a/src/web/middleware/weave.rs +++ b/syncstorage/src/web/middleware/weave.rs @@ -1,5 +1,5 @@ use std::fmt::Display; -use std::task::Context; +use std::task::{Context, Poll}; use actix_web::{ dev::{Service, ServiceRequest, ServiceResponse, Transform}, @@ -8,9 +8,8 @@ use actix_web::{ }; use futures::future::{self, LocalBoxFuture, TryFutureExt}; -use std::task::Poll; +use syncstorage_db_common::util::SyncTimestamp; -use crate::db::util::SyncTimestamp; use crate::error::{ApiError, ApiErrorKind}; use crate::web::{DOCKER_FLOW_ENDPOINTS, X_LAST_MODIFIED, X_WEAVE_TIMESTAMP}; diff --git a/src/web/mod.rs b/syncstorage/src/web/mod.rs similarity index 89% rename from src/web/mod.rs rename to syncstorage/src/web/mod.rs index e9a266b9c9..9f2ea9bb44 100644 --- a/src/web/mod.rs +++ b/syncstorage/src/web/mod.rs @@ -23,3 +23,10 @@ pub const DOCKER_FLOW_ENDPOINTS: [&str; 4] = [ "/__version__", "/__error__", ]; + +#[macro_export] +macro_rules! label { + ($string:expr) => { + Some($string.to_string()) + }; +} diff --git a/src/web/tags.rs b/syncstorage/src/web/tags.rs similarity index 100% rename from src/web/tags.rs rename to syncstorage/src/web/tags.rs diff --git a/version.json b/syncstorage/version.json similarity index 100% rename from version.json rename to syncstorage/version.json diff --git a/tokenserver-common/Cargo.toml b/tokenserver-common/Cargo.toml new file mode 100644 index 0000000000..18ced55cfa --- /dev/null +++ b/tokenserver-common/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "tokenserver-common" +version = "0.10.2" +edition = "2021" + +[dependencies] +actix-web = "3" +serde = "1.0" +serde_json = { version = "1.0", features = ["arbitrary_precision"] } +thiserror = "1.0.26" diff --git a/src/tokenserver/error.rs b/tokenserver-common/src/error.rs similarity index 91% rename from src/tokenserver/error.rs rename to tokenserver-common/src/error.rs index 9794646602..24500676f7 100644 --- a/src/tokenserver/error.rs +++ b/tokenserver-common/src/error.rs @@ -1,10 +1,6 @@ use std::fmt; -use actix_web::{ - error::{BlockingError, ResponseError}, - http::StatusCode, - HttpResponse, -}; +use actix_web::{http::StatusCode, HttpResponse, ResponseError}; use serde::{ ser::{SerializeMap, Serializer}, Serialize, @@ -127,18 +123,6 @@ impl TokenserverError { } } -impl From> for TokenserverError { - fn from(inner: BlockingError) -> Self { - match inner { - BlockingError::Error(e) => e, - BlockingError::Canceled => TokenserverError { - context: "Threadpool operation canceled".to_owned(), - ..TokenserverError::internal_error() - }, - } - } -} - #[derive(Clone, Copy, Debug, PartialEq)] pub enum ErrorLocation { Header, diff --git a/tokenserver-common/src/lib.rs b/tokenserver-common/src/lib.rs new file mode 100644 index 0000000000..a91e735174 --- /dev/null +++ b/tokenserver-common/src/lib.rs @@ -0,0 +1 @@ +pub mod error;