Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Convert integral values to String before converting to Value #1056

Merged
merged 5 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 67 additions & 70 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use protobuf::{
use uuid::Uuid;

use super::models::{Result, SpannerDb, DEFAULT_BSO_TTL, PRETOUCH_TS};
use super::support::{null_value, struct_type_field, ToSpannerValue};
use super::support::{as_type, null_value, struct_type_field, ToSpannerValue};
use crate::{
db::{params, results, util::to_rfc3339, DbError, DbErrorKind, BATCH_LIFETIME},
web::{extractors::HawkIdentifier, tags::Tags},
Expand All @@ -35,20 +35,20 @@ pub async fn create_async(
id: batch_id,
};

db.sql(
"INSERT INTO batches (fxa_uid, fxa_kid, collection_id, batch_id, expiry)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @expiry)",
)?
.params(params! {
let (sqlparams, mut sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"collection_id" => collection_id,
"batch_id" => new_batch.id.clone(),
"expiry" => to_rfc3339(timestamp + BATCH_LIFETIME)?,
})
.param_types(param_types! {
"expiry" => TypeCode::TIMESTAMP,
})
};
sqlparam_types.insert("expiry".to_owned(), as_type(TypeCode::TIMESTAMP));
db.sql(
"INSERT INTO batches (fxa_uid, fxa_kid, collection_id, batch_id, expiry)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @expiry)",
)?
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.await?;

Expand Down Expand Up @@ -116,6 +116,12 @@ pub async fn get_async(
params: params::GetBatch,
) -> Result<Option<results::GetBatch>> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
let (sqlparams, sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id,
"batch_id" => params.id.clone(),
};
let batch = db
.sql(
"SELECT 1
Expand All @@ -126,12 +132,8 @@ pub async fn get_async(
AND batch_id = @batch_id
AND expiry > CURRENT_TIMESTAMP()",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => params.id.clone(),
})
.params(sqlparams)
.param_types(sqlparam_types)
.execute_async(&db.conn)?
.one_or_none()
.await?
Expand All @@ -141,6 +143,12 @@ pub async fn get_async(

pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
let collection_id = db.get_collection_id_async(&params.collection).await?;
let (sqlparams, sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id,
"batch_id" => params.id,
};
// Also deletes child batch_bsos rows (INTERLEAVE IN PARENT batches ON
// DELETE CASCADE)
db.sql(
Expand All @@ -150,12 +158,8 @@ pub async fn delete_async(db: &SpannerDb, params: params::DeleteBatch) -> Result
AND collection_id = @collection_id
AND batch_id = @batch_id",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => params.id,
})
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.await?;
Ok(())
Expand All @@ -181,39 +185,38 @@ pub async fn commit_async(
// supplied in this batch
let mut timer2 = db.metrics.clone();
timer2.start_timer("storage.spanner.apply_batch_update", None);
let (sqlparams, mut sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id,
"batch_id" => params.batch.id.clone(),
"timestamp" => as_rfc3339.clone(),
};
sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOT THAT WE NEED TO DO THIS NOW

But we really could take advantage of they SyncTimestamp type better for stuff like this.
(heh, Good first bug material.)

db.sql(include_str!("batch_commit_update.sql"))?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => params.batch.id.clone(),
"timestamp" => as_rfc3339.clone(),
})
.param_types(param_types! {
"timestamp" => TypeCode::TIMESTAMP,
})
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.await?;
}

{
// Then INSERT INTO SELECT remaining rows from this batch into the bsos
// table (that didn't already exist there)
let (sqlparams, mut sqlparam_types) = params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id,
"batch_id" => params.batch.id.clone(),
"timestamp" => as_rfc3339,
"default_bso_ttl" => DEFAULT_BSO_TTL,
};
sqlparam_types.insert("timestamp".to_owned(), as_type(TypeCode::TIMESTAMP));
let mut timer3 = db.metrics.clone();
timer3.start_timer("storage.spanner.apply_batch_insert", None);
db.sql(include_str!("batch_commit_insert.sql"))?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => params.batch.id.clone(),
"timestamp" => as_rfc3339,
"default_bso_ttl" => DEFAULT_BSO_TTL.to_string(),
})
.param_types(param_types! {
"timestamp" => TypeCode::TIMESTAMP,
"default_bso_ttl" => TypeCode::INT64,
})
.params(sqlparams)
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.await?;
}
Expand Down Expand Up @@ -288,17 +291,17 @@ pub async fn do_append_async(
.unwrap_or_else(|| "UNKNOWN".to_string()),
);

let bso_ids = bsos.iter().map(|pbso| pbso.id.clone());
let mut params = params! {
let bso_ids = bsos
.iter()
.map(|pbso| pbso.id.clone())
.collect::<Vec<String>>();
let (sqlparams, sqlparam_types) = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"collection_id" => collection_id,
"batch_id" => batch.id.clone(),
"ids" => bso_ids,
};
params.insert(
"ids".to_owned(),
bso_ids.collect::<Vec<String>>().to_spanner_value(),
);
let mut existing_stream = db
.sql(
"SELECT batch_bso_id
Expand All @@ -309,7 +312,8 @@ pub async fn do_append_async(
AND batch_id=@batch_id
AND batch_bso_id in UNNEST(@ids);",
)?
.params(params)
.params(sqlparams)
.param_types(sqlparam_types)
.execute_async(&db.conn)?;
while let Some(row) = existing_stream.next_async().await {
let row = row?;
Expand Down Expand Up @@ -410,16 +414,6 @@ pub async fn do_append_async(
}
}

let param_types = param_types! { // ### TODO: this should be normalized to one instance.
"fxa_uid" => TypeCode::STRING,
"fxa_kid"=> TypeCode::STRING,
"collection_id"=> TypeCode::INT64,
"batch_id"=> TypeCode::STRING,
"batch_bso_id"=> TypeCode::STRING,
"sortindex"=> TypeCode::INT64,
"payload"=> TypeCode::STRING,
"ttl"=> TypeCode::INT64,
};
let fields = vec![
("fxa_uid", TypeCode::STRING),
("fxa_kid", TypeCode::STRING),
Expand Down Expand Up @@ -477,24 +471,27 @@ pub async fn do_append_async(
if !update.is_empty() {
for val in update {
let mut fields = Vec::new();
let mut params = params! {
let (mut params, mut param_types) = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"collection_id" => collection_id,
"batch_id" => batch.id.clone(),
"batch_bso_id" => val.bso_id,
};
if let Some(sortindex) = val.sortindex {
fields.push("sortindex");
params.insert("sortindex".to_owned(), sortindex.to_spanner_value());
param_types.insert("sortindex".to_owned(), sortindex.spanner_type());
}
if let Some(payload) = val.payload {
fields.push("payload");
params.insert("payload".to_owned(), payload.to_spanner_value());
param_types.insert("payload".to_owned(), payload.spanner_type());
};
if let Some(ttl) = val.ttl {
fields.push("ttl");
params.insert("ttl".to_owned(), ttl.to_spanner_value());
param_types.insert("ttl".to_owned(), ttl.spanner_type());
}
if fields.is_empty() {
continue;
Expand Down Expand Up @@ -533,10 +530,10 @@ async fn pretouch_collection_async(
user_id: &HawkIdentifier,
collection_id: i32,
) -> Result<()> {
let mut sqlparams = params! {
let (mut sqlparams, mut sqlparam_types) = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"collection_id" => collection_id,
};
let result = db
.sql(
Expand All @@ -547,6 +544,7 @@ async fn pretouch_collection_async(
AND collection_id = @collection_id",
)?
.params(sqlparams.clone())
.param_types(sqlparam_types.clone())
.execute_async(&db.conn)?
.one_or_none()
.await?;
Expand All @@ -555,6 +553,7 @@ async fn pretouch_collection_async(
"modified".to_owned(),
PRETOUCH_TS.to_owned().to_spanner_value(),
);
sqlparam_types.insert("modified".to_owned(), as_type(TypeCode::TIMESTAMP));
let sql = if db.quota.enabled {
"INSERT INTO user_collections (fxa_uid, fxa_kid, collection_id, modified, count, total_bytes)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @modified, 0, 0)"
Expand All @@ -564,9 +563,7 @@ async fn pretouch_collection_async(
};
db.sql(sql)?
.params(sqlparams)
.param_types(param_types! {
"modified" => TypeCode::TIMESTAMP,
})
.param_types(sqlparam_types)
.execute_dml_async(&db.conn)
.await?;
}
Expand Down
Loading