Skip to content

Commit

Permalink
Merge pull request #243 from mozilla-services/bug/242
Browse files Browse the repository at this point in the history
bug: Fix spanner batches to use new fxa_uid as userid
  • Loading branch information
jrconlin authored Oct 4, 2019
2 parents 11b4b07 + 3ccd90d commit accd999
Showing 1 changed file with 103 additions and 61 deletions.
164 changes: 103 additions & 61 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,28 @@ fn batch_string_to_bsos(bsos: &str) -> Result<Vec<params::PostCollectionBso>> {
}

pub fn create(db: &SpannerDb, params: params::CreateBatch) -> Result<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let collection_id = db.get_collection_id(&params.collection)?.to_string();
let timestamp = db.timestamp()?.as_i64();
if params.bsos.is_empty() {
db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(timestamp)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => "".to_string(),
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
db.sql(
"INSERT INTO batches (userid, fxa_kid, collection, id, bsos, expiry, timestamp)
VALUES (@fxa_uid, @fxa_kid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collectionid" => collection_id.clone(),
"bsoid" => to_rfc3339(timestamp)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => "".to_string(),
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
}
for (i, bso) in (&params.bsos).iter().enumerate() {
let bsos = json!({
Expand All @@ -65,32 +68,44 @@ pub fn create(db: &SpannerDb, params: params::CreateBatch) -> Result<results::Cr
})
.to_string();

db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(timestamp + i as i64)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => bsos,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
db.sql(
"INSERT INTO batches (userid, fxa_kid, collection, id, bsos, expiry, timestamp)
VALUES (@fxa_uid, @fxa_kid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collectionid" => collection_id.clone(),
"bsoid" => to_rfc3339(timestamp + i as i64)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => bsos,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
}

Ok(timestamp)
}

pub fn validate(db: &SpannerDb, params: params::ValidateBatch) -> Result<bool> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let exists = db.sql("SELECT expiry FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @timestamp AND expiry > @expiry")?
let exists = db
.sql(
"SELECT expiry FROM batches
WHERE userid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection = @collectionid
AND timestamp = @timestamp
AND expiry > @expiry",
)?
.params(params! {
"userid" => user_id.to_string(),
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collectionid" => collection_id.to_string(),
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(db.timestamp()?.as_i64())?,
Expand All @@ -105,11 +120,21 @@ pub fn validate(db: &SpannerDb, params: params::ValidateBatch) -> Result<bool> {
}

pub fn select_max_id(db: &SpannerDb, params: params::ValidateBatch) -> Result<i64> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let exists = db.sql("SELECT UNIX_MILLIS(id) FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @timestamp AND expiry > @expiry ORDER BY id DESC")?
let exists = db
.sql(
"SELECT UNIX_MILLIS(id)
FROM batches
WHERE userid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection = @collectionid
AND timestamp = @timestamp
AND expiry > @expiry
ORDER BY id DESC",
)?
.params(params! {
"userid" => user_id.to_string(),
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collectionid" => collection_id.to_string(),
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(db.timestamp()?.as_i64())?,
Expand All @@ -131,7 +156,6 @@ pub fn select_max_id(db: &SpannerDb, params: params::ValidateBatch) -> Result<i6
}

pub fn append(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = params.id;
if let Ok(max_id) = select_max_id(
Expand All @@ -151,21 +175,25 @@ pub fn append(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
"ttl": bso.ttl,
})
.to_string();
db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id + i)?,
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
"bsos" => bsos,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
db.sql(
"INSERT INTO batches (userid, fxa_kid, collection, id, bsos, expiry, timestamp)
VALUES (@fxa_uid, @fxa_kid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id + i)?,
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
"bsos" => bsos,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
i += 1;
}
Ok(())
Expand All @@ -175,13 +203,22 @@ pub fn append(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
}

pub fn get(db: &SpannerDb, params: params::GetBatch) -> Result<Option<results::GetBatch>> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = db.timestamp()?.as_i64();

let result = db.sql("SELECT id, bsos, expiry FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @bsoid AND expiry > @expiry")?
let result = db
.sql(
"SELECT id, bsos, expiry
FROM batches
WHERE userid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection = @collectionid
AND timestamp = @bsoid
AND expiry > @expiry",
)?
.params(params! {
"userid" => user_id.to_string(),
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp)?,
Expand All @@ -190,7 +227,8 @@ pub fn get(db: &SpannerDb, params: params::GetBatch) -> Result<Option<results::G
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?.all_or_none();
.execute(&db.conn)?
.all_or_none();
if let Some(result) = result {
Ok(Some(params::Batch {
id: params.id,
Expand All @@ -206,14 +244,18 @@ pub fn get(db: &SpannerDb, params: params::GetBatch) -> Result<Option<results::G
}

pub fn delete(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;

db.sql(
"DELETE FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @bsoid",
"DELETE FROM batches
WHERE userid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection = @collectionid
AND timestamp = @bsoid",
)?
.params(params! {
"userid" => user_id.to_string(),
"fxa_uid" => params.user_id.fxa_uid,
"fxa_kid" => params.user_id.fxa_kid,
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id)?,
})
Expand Down

0 comments on commit accd999

Please sign in to comment.