Skip to content

Commit

Permalink
Merge branch 'master' of github.com:mozilla-services/syncstorage-rs i…
Browse files Browse the repository at this point in the history
…nto fix/170
  • Loading branch information
jrconlin committed Oct 3, 2019
2 parents c011ec8 + a2dd503 commit 821f8f6
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 73 deletions.
184 changes: 125 additions & 59 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,26 @@ use super::{
models::{Result, SpannerDb},
support::{as_value, SpannerType},
};
use crate::db::{
params, results,
util::{to_rfc3339, SyncTimestamp},
DbError, DbErrorKind, BATCH_LIFETIME,
};
use crate::db::{params, results, util::to_rfc3339, DbError, DbErrorKind, BATCH_LIFETIME};
use protobuf::well_known_types::ListValue;
use serde_json::json;

/// Serialize bsos into strings separated by newlines
fn bsos_to_batch_string(bsos: &[params::PostCollectionBso]) -> Result<String> {
let batch_strings: Result<Vec<String>> = bsos
.iter()
.map(|bso| {
serde_json::to_string(bso).map_err(|e| {
DbError::internal(&format!("Couldn't serialize batch::create bso: {}", e))
/// Serialize results into strings separated by newlines
fn results_to_batch_string(results: Vec<ListValue>) -> String {
if results.is_empty() {
"".to_string()
} else {
let batch_strings: Vec<String> = results
.iter()
.map(|result| {
result.get_values().to_vec()[1]
.get_string_value()
.to_string()
})
})
.collect();
batch_strings.map(|bs| {
format!(
"{}{}",
bs.join("\n"),
if bsos.is_empty() { "" } else { "\n" }
)
})
.filter(|result| !result.is_empty())
.collect();
batch_strings.join("\n")
}
}

/// Deserialize a batch string into bsos
Expand All @@ -42,65 +39,135 @@ pub fn create(db: &SpannerDb, params: params::CreateBatch) -> Result<results::Cr
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = db.timestamp()?.as_i64();
let bsos = bsos_to_batch_string(&params.bsos)?;

db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(timestamp)?,
"bsos" => bsos,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
if params.bsos.is_empty() {
db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(timestamp)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => "".to_string(),
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
}
for (i, bso) in (&params.bsos).iter().enumerate() {
let bsos = json!({
"id": bso.id,
"sortindex": bso.sortindex,
"payload": bso.payload,
"ttl": bso.ttl,
})
.execute(&db.conn)?;
.to_string();

db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(timestamp + i as i64)?,
"timestamp" => to_rfc3339(timestamp)?,
"bsos" => bsos,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
}

Ok(timestamp)
}

pub fn validate(db: &SpannerDb, params: params::ValidateBatch) -> Result<bool> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = db.timestamp()?.as_i64();

let exists = db.sql("SELECT 1 FROM batches WHERE userid = @userid AND collection = @collectionid AND id = @bsoid AND expiry > @expiry")?
let exists = db.sql("SELECT expiry FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @timestamp AND expiry > @expiry")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp)?,
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(db.timestamp()?.as_i64())?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?
.one_or_none()?;
.all_or_none();
Ok(exists.is_some())
}

pub fn append(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
pub fn select_max_id(db: &SpannerDb, params: params::ValidateBatch) -> Result<i64> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let bsos = bsos_to_batch_string(&params.bsos)?;
let timestamp = db.timestamp()?.as_i64();

let result = db.sql("UPDATE batches SET bsos = CONCAT(bsos, @bsos) WHERE userid = @userid AND collection = @collectionid AND id = @bsoid AND expiry > @expiry")?
let exists = db.sql("SELECT UNIX_MILLIS(id) FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @timestamp AND expiry > @expiry ORDER BY id DESC")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp)?,
"bsos" => bsos,
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(db.timestamp()?.as_i64())?,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
if result.affected_rows()? == 1 {
.execute(&db.conn)?
.all_or_none();
if let Some(exists) = exists {
return Ok(exists[0].get_values().to_vec()[0]
.get_string_value()
.to_string()
.parse::<i64>()
.unwrap());
}
Err(DbError::internal("No rows matched the given query."))
}

pub fn append(db: &SpannerDb, params: params::AppendToBatch) -> Result<()> {
let user_id = params.user_id.legacy_id as i32;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = params.id;
if let Ok(max_id) = select_max_id(
db,
params::ValidateBatch {
id: timestamp,
user_id: params.user_id.clone(),
collection: params.collection.clone(),
},
) {
let mut i = max_id + 1;
for bso in &params.bsos {
let bsos = json!({
"id": bso.id,
"sortindex": bso.sortindex,
"payload": bso.payload,
"ttl": bso.ttl,
})
.to_string();
db.sql("INSERT INTO batches (userid, collection, id, bsos, expiry, timestamp) VALUES (@userid, @collectionid, @bsoid, @bsos, @expiry, @timestamp)")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
"bsoid" => to_rfc3339(params.id + i)?,
"timestamp" => to_rfc3339(params.id)?,
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
"bsos" => bsos,
})
.param_types(param_types! {
"bsoid" => SpannerType::Timestamp,
"timestamp" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?;
i += 1;
}
Ok(())
} else {
Err(DbErrorKind::BatchNotFound.into())
Expand All @@ -112,7 +179,7 @@ pub fn get(db: &SpannerDb, params: params::GetBatch) -> Result<Option<results::G
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = db.timestamp()?.as_i64();

let result = db.sql("SELECT id, bsos, expiry FROM batches WHERE userid = @userid AND collection = @collectionid AND id = @bsoid AND expiry > @expiry")?
let result = db.sql("SELECT id, bsos, expiry FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @bsoid AND expiry > @expiry")?
.params(params! {
"userid" => user_id.to_string(),
"collectionid" => collection_id.to_string(),
Expand All @@ -123,16 +190,15 @@ pub fn get(db: &SpannerDb, params: params::GetBatch) -> Result<Option<results::G
"bsoid" => SpannerType::Timestamp,
"expiry" => SpannerType::Timestamp,
})
.execute(&db.conn)?
.one_or_none()?;
.execute(&db.conn)?.all_or_none();
if let Some(result) = result {
Ok(Some(params::Batch {
id: params.id,
bsos: result[1].get_string_value().to_owned(),
bsos: results_to_batch_string(result),
// XXX: we don't really use expiry (but it's probably needed for
// mysql/diesel compat). converting it back to i64 is maybe
// suspicious
expiry: SyncTimestamp::from_rfc3339(result[2].get_string_value())?.as_i64(),
expiry: 0,
}))
} else {
Ok(None)
Expand All @@ -144,7 +210,7 @@ pub fn delete(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
let collection_id = db.get_collection_id(&params.collection)?;

db.sql(
"DELETE FROM batches WHERE userid = @userid AND collection = @collectionid AND id = @bsoid",
"DELETE FROM batches WHERE userid = @userid AND collection = @collectionid AND timestamp = @bsoid",
)?
.params(params! {
"userid" => user_id.to_string(),
Expand Down
23 changes: 9 additions & 14 deletions src/db/spanner/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,26 +215,21 @@ impl SyncResultSet {
}
}

#[cfg(feature = "google_grpc")]
pub fn affected_rows(self: &SyncResultSet) -> Result<i64> {
let stats = self
.stats()
.ok_or_else(|| DbError::internal("Expected result_set stats"))?;
Ok(stats.get_row_count_exact())
pub fn all_or_none(&mut self) -> Option<Vec<ListValue>> {
if self.result.rows.is_empty() {
None
} else {
Some(self.result.rows.clone().into_vec())
}
}

#[cfg(not(feature = "google_grpc"))]
#[cfg(feature = "google_grpc")]
pub fn affected_rows(self: &SyncResultSet) -> Result<i64> {
let stats = self
.stats()
.ok_or_else(|| DbError::internal("Expected result_set stats"))?;
let row_count_exact = stats
.row_count_exact
.as_ref()
.ok_or_else(|| DbError::internal("Expected result_set stats row_count_exact"))?;
Ok(row_count_exact
.parse()
.map_err(|e| DbError::internal(&format!("Invalid row_count_exact i64 value {}", e)))?)
let row_count_exact = stats.get_row_count_exact();
Ok(row_count_exact)
}
}

Expand Down

0 comments on commit 821f8f6

Please sign in to comment.