Skip to content

Commit

Permalink
bug: fold commit message bsos into pending batch
Browse files Browse the repository at this point in the history
The client may sometimes include bsos in the batch
commit message. The problem is that due to the way
that data is written to spanner, mutations do not
retain the ability to see data previously written
in the same transaction. This causes collisions.

To solve this, treat the bsos included in the commit
as another batch update, then commit all the data.
This does run the risk of bumping up against the
mutation limit, but it ensures the best chance of
data consistency.

Writing the commit bsos prior to batch commit will
result in the "newer" records being overwritten by
"older" ones in the batch.

Writing the commit bsos after the batch commit runs
the same mutation index conflict problem.

Closes #882
  • Loading branch information
jrconlin committed Jan 21, 2021
1 parent 0101eac commit 8756cc9
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ hostname = "0.3.1"
hkdf = "0.10"
hmac = "0.10"
jsonwebtoken = "7.2.0"
log = { version = "0.4", features = ["max_level_info", "release_max_level_info"] }
log = { version = "0.4", features = ["max_level_trace", "release_max_level_info"] }
mime = "0.3"
num_cpus = "1"
# must match what's used by googleapis-raw
Expand Down
116 changes: 55 additions & 61 deletions src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,75 +318,69 @@ pub async fn post_collection_batch(
.await?
};

let commit = breq.commit;
let user_id = coll.user_id.clone();
let collection = coll.collection.clone();

let mut success = vec![];
let mut failed = coll.bsos.invalid;
// Option #2, mark each bso.id included in the "commit" as "failing" so that they're
// resubmitted.
let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect();

let result = if commit && !coll.bsos.valid.is_empty() {
// There's pending items to append to the batch but since we're
// committing, write them to bsos immediately. Otherwise under
// Spanner we would pay twice the mutations for those pending
// items (once writing them to to batch_bsos, then again
// writing them to bsos)
trace!("Batch: Committing {}", &new_batch.id);
db.post_bsos(params::PostBsos {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
// XXX: why does BatchBsoBody exist (it's the same struct
// as PostCollectionBso)?
bsos: coll
.bsos
.valid
.into_iter()
.map(|batch_bso| params::PostCollectionBso {
id: batch_bso.id,
sortindex: batch_bso.sortindex,
payload: batch_bso.payload,
ttl: batch_bso.ttl,
})
.collect(),
for_batch: true,
failed: Default::default(),
})
.await
.map(|_| ())
} else {
// We're not yet to commit the accumulated batch, but there are some
// additional records we need to add.
trace!("Batch: Appending to {}", &new_batch.id);
db.append_to_batch(params::AppendToBatch {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
batch: new_batch.clone(),
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
})
.await
};

// collect up the successful and failed bso_ids into a response.
match result {
Ok(_) => success.extend(bso_ids),
Err(e) if e.is_conflict() => return Err(e.into()),
Err(apperr) => {
if let ApiErrorKind::Db(dberr) = apperr.kind() {
// If we're over quota, return immediately with a 403 to let the client know.
// Otherwise the client will simply keep retrying records.
if let DbErrorKind::Quota = dberr.kind() {
return Err(apperr.into());
}
};
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
}
};
let mut resp: Value = json!({});

/* Ideally, we would be clever here and pre-commit the bsos included
with this request after the batches are committed.
There are a few problems with doing that.
1) since there are two different methods being used (DML &
Mutations), the previous written data included can't always
be read by the same transaction.
2) newer elements should probably overwrite older batch elements.
This means that we pay a mutations tax by double writing
(or we might be able to have an "exclude" list when we do the
final merge, but that SQL is gnarly, so this is probably
safer.)
Posting the bsos once the commit is done can also run into conflicts
because of the same mutation read limitations.
*/
if !coll.bsos.valid.is_empty() {
let result = {
dbg!("Batch: Appending to {}", &new_batch.id);
db.append_to_batch(params::AppendToBatch {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
batch: new_batch.clone(),
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
})
.await
};

// collect up the successful and failed bso_ids into a response.
match result {
Ok(_) => success.extend(bso_ids.clone()),
Err(e) if e.is_conflict() => return Err(e.into()),
Err(apperr) => {
if let ApiErrorKind::Db(dberr) = apperr.kind() {
// If we're over quota, return immediately with a 403 to let the client know.
// Otherwise the client will simply keep retrying records.
if let DbErrorKind::Quota = dberr.kind() {
return Err(apperr.into());
}
};
dbg!(apperr);
failed.extend(
bso_ids
.clone()
.into_iter()
.map(|id| (id, "db error".to_owned())),
)
}
};
}

let mut resp = json!({
"success": success,
"failed": failed,
});
resp["success"] = json!(success);
resp["failed"] = json!(failed);

if !breq.commit {
resp["batch"] = json!(&new_batch.id);
Expand Down
29 changes: 29 additions & 0 deletions tools/integration_tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,35 @@ def test_batches(self):
resp3 = self.app.get(endpoint + '/e')
self.assertEquals(committed, resp3.json['modified'])


def test_batch_commit_collision(self):
# It's possible that a batch contain a BSO inside a batch as well
# as inside the final "commit" message. This is a bit of a problem
# for spanner because of conflicting ways that the data is written
# to the database and the discoverability of IDs in previously
# submitted batches.
endpoint = self.root + '/storage/xxx_col2'
orig = "Letting the days go by"
repl = "Same as it ever was"

batch_num = self.retry_post_json(
endpoint + "?batch=true",
[{"id":"b0", "payload": orig}]
).json["batch"]

resp = self.retry_post_json(
endpoint + "?batch={}&commit=true".format(batch_num),
[{"id":"b0", "payload": repl}]
)

# this should succeed, using the newerer payload value.
assert resp.json["failed"] == {}, "batch commit failed"
assert resp.json["success"] == ["b0"], "batch commit id incorrect"
resp = self.app.get(endpoint+"?full=1")
assert resp.json[0].get(
"payload") == repl, "wrong payload returned"


def test_we_dont_need_no_stinkin_batches(self):
endpoint = self.root + '/storage/xxx_col2'

Expand Down

0 comments on commit 8756cc9

Please sign in to comment.