Skip to content

Commit

Permalink
f ensure a pre-existing bso (#860)
Browse files Browse the repository at this point in the history
dynamically build up params for each do_append update

Co-authored-by: Philip Jenvey <[email protected]>
  • Loading branch information
jrconlin and pjenvey authored Oct 14, 2020
1 parent 3271130 commit e68c1c7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 54 deletions.
78 changes: 35 additions & 43 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ pub async fn do_append_async(
}

struct UpdateRecord {
sortindex: Value,
ttl: Value,
bso_id: String,
payload: String,
sortindex: Option<i32>,
payload: Option<String>,
ttl: Option<u32>,
};

//prefetch the existing batch_bsos for this user's batch.
Expand Down Expand Up @@ -318,31 +318,31 @@ pub async fn do_append_async(
let mut insert: Vec<Value> = Vec::new();
let mut update: Vec<UpdateRecord> = Vec::new();
for bso in bsos {
let sortindex = bso
.sortindex
.map(|sortindex| as_value(sortindex.to_string()))
.unwrap_or_else(null_value);
let mut payload = bso.payload.map(as_value).unwrap_or_else(null_value);
if payload != null_value() {
running_size += payload.get_string_value().len();
if let Some(ref payload) = bso.payload {
running_size += payload.len();
}
let ttl = bso
.ttl
.map(|ttl| as_value(ttl.to_string()))
.unwrap_or_else(null_value);

let exist_idx = exist_idx(&collection_id.to_string(), &batch.id, &bso.id);

if existing.contains(&exist_idx) {
// need to update this record
// reject this record since you can only have one update per batch
update.push(UpdateRecord {
sortindex,
ttl,
bso_id: bso.id,
payload: payload.take_string_value(),
sortindex: bso.sortindex,
payload: bso.payload,
ttl: bso.ttl,
});
} else {
let sortindex = bso
.sortindex
.map(|sortindex| as_value(sortindex.to_string()))
.unwrap_or_else(null_value);
let payload = bso.payload.map(as_value).unwrap_or_else(null_value);
let ttl = bso
.ttl
.map(|ttl| as_value(ttl.to_string()))
.unwrap_or_else(null_value);

// convert to a protobuf structure for direct insertion to
// avoid some mutation limits.
let mut row = ListValue::new();
Expand Down Expand Up @@ -435,24 +435,25 @@ pub async fn do_append_async(
.incr("storage.spanner.adding_updates_to_batch_bsos");
for val in update {
let mut fields = Vec::new();
let mut sort_index = val.sortindex.get_string_value().to_owned();
if !sort_index.is_empty() {
fields.push("sortindex");
} else {
// This is not written, but the sql builder requires a numeric string
sort_index = "0".to_owned();
};
let mut ttl = val.ttl.get_string_value().to_owned();
if !ttl.is_empty() {
fields.push("ttl");
} else {
// This is not written, but the sql builder requires a numeric string
ttl = "0".to_owned();
let mut params = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => batch.id.clone(),
"batch_bso_id" => val.bso_id,
};
let payload = val.payload;
if !payload.is_empty() {
if let Some(sortindex) = val.sortindex {
fields.push("sortindex");
params.insert("sortindex".to_owned(), as_value(sortindex.to_string()));
}
if let Some(payload) = val.payload {
fields.push("payload");
params.insert("payload".to_owned(), as_value(payload));
};
if let Some(ttl) = val.ttl {
fields.push("ttl");
params.insert("ttl".to_owned(), as_value(ttl.to_string()));
}
if fields.is_empty() {
continue;
};
Expand All @@ -467,16 +468,7 @@ pub async fn do_append_async(
AND batch_id=@batch_id AND batch_bso_id=@batch_bso_id",
updatable = updatable
))?
.params(params!(
"sortindex" => sort_index,
"payload" => payload,
"ttl" => ttl,
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => batch.id.clone(),
"batch_bso_id" => val.bso_id,
))
.params(params)
.param_types(param_types.clone())
.execute_dml_async(&db.conn)
.await?;
Expand Down
16 changes: 5 additions & 11 deletions src/db/tests/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use log::debug;

use super::support::{db_pool, gbso, hid, postbso, test_db, Result};
use super::support::{db_pool, gbso, hid, pbso, postbso, test_db, Result};
use crate::{
db::{error::DbErrorKind, params, results, util::SyncTimestamp, BATCH_LIFETIME},
error::ApiErrorKind,
Expand Down Expand Up @@ -256,10 +256,11 @@ async fn test_append_async_w_null() -> Result<()> {
let uid = 1;
let coll = "clients";
let payload = "payload 0";
let first_bsos = vec![postbso("b0", Some(payload), Some(10), Some(ttl + 10_000))];
let new_batch = db.create_batch(cb(uid, coll, first_bsos)).await?;
let tomorrow = ttl + 20_000;
let first_bso = pbso(uid, coll, "b0", Some(payload), Some(10), Some(ttl + 10_000));
db.put_bso(first_bso).await?;

let tomorrow = ttl + 20_000;
let new_batch = db.create_batch(cb(uid, coll, vec![])).await?;
// update the single bso twice, leaving payload the same.
db.append_to_batch(ab(
uid,
Expand Down Expand Up @@ -291,12 +292,5 @@ async fn test_append_async_w_null() -> Result<()> {
assert!(bso.payload == payload);
assert!(bso.sortindex == Some(15));

// clean up your toys.
db.delete_batch(params::DeleteBatch {
user_id: hid(uid),
collection: coll.to_owned(),
id: new_batch.id.clone(),
});

Ok(())
}

0 comments on commit e68c1c7

Please sign in to comment.