Skip to content

Commit

Permalink
Detect write conflicts in SI
Browse files Browse the repository at this point in the history
To avoid the lost update anomaly, the write sets of the concurrent TXNs
should not intersect.
  • Loading branch information
w41ter committed Aug 10, 2024
1 parent cb59694 commit a37f0b1
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 15 deletions.
8 changes: 6 additions & 2 deletions src/api/sekas/server/v1/error.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ message Error {
}

message ErrorDetail {
string message = 1;
string message = 1;

ErrorDetailUnion detail = 2;
}

message ErrorDetailUnion {
oneof value {
int32 status_code = 1;
Expand All @@ -53,6 +53,7 @@ message ErrorDetailUnion {
GroupNotFound group_not_found = 5;
NotRoot not_root = 6;
CasFailed cas_failed = 7;
TxnConflict txn_conflict = 8;
}
}

Expand Down Expand Up @@ -101,3 +102,6 @@ message CasFailed {
// The prev value of the cas touched key, if take_prev_value is set.
optional Value prev_value = 3;
}

// The txn is conflict with others.
message TxnConflict {}
5 changes: 5 additions & 0 deletions src/api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ impl Error {
}))
}

#[inline]
pub fn txn_conflict() -> Self {
Self::with_detail_value(error_detail_union::Value::TxnConflict(TxnConflict {}))
}

#[inline]
pub fn status(code: i32, msg: impl Into<String>) -> Self {
Error { details: vec![ErrorDetail::status(code, msg)] }
Expand Down
9 changes: 9 additions & 0 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub enum AppError {
#[error("cas condition {1} not satisfied, operation index {0}")]
CasFailed(u64, u64, Option<Value>),

#[error("the txn is conflict with others")]
TxnConflict,

#[error("network: {0}")]
Network(tonic::Status),

Expand Down Expand Up @@ -64,6 +67,9 @@ pub enum Error {
#[error("cas condition {1} not satisfied, operation index {0}")]
CasFailed(u64, u64, Option<Value>),

#[error("the txn is conflict with others")]
TxnConflict,

#[error("group epoch not match")]
EpochNotMatch(GroupDesc),

Expand Down Expand Up @@ -142,6 +148,7 @@ impl From<sekas_api::server::v1::Error> for Error {
Some(Value::NotMatch(v)) => Error::EpochNotMatch(v.descriptor.unwrap_or_default()),
Some(Value::StatusCode(v)) => Status::new(v.into(), msg).into(),
Some(Value::CasFailed(v)) => Error::CasFailed(v.index, v.cond_index, v.prev_value),
Some(Value::TxnConflict(_)) => Error::TxnConflict,
_ => Status::internal(format!("unknown error detail, msg: {msg}")).into(),
}
}
Expand All @@ -163,6 +170,7 @@ impl From<Error> for AppError {
Error::CasFailed(index, cond_index, prev_value) => {
AppError::CasFailed(index, cond_index, prev_value)
}
Error::TxnConflict => AppError::TxnConflict,
Error::Internal(v) => AppError::Internal(v),

Error::Transport(status) => AppError::Network(status),
Expand All @@ -189,6 +197,7 @@ impl From<AppError> for tonic::Status {
AppError::InvalidArgument(msg) => Status::invalid_argument(msg),
AppError::DeadlineExceeded(msg) => Status::deadline_exceeded(msg),
AppError::CasFailed(_, _, _) => todo!("not supported"),
AppError::TxnConflict => todo!("not supported"),
AppError::Network(status) => status, // as proxy
AppError::Internal(err) => Status::internal(err.to_string()),
}
Expand Down
5 changes: 4 additions & 1 deletion src/client/src/group_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ impl GroupClient {
}
Error::EpochNotMatch(group_desc) => self.apply_epoch_not_match_status(group_desc, opt),
e => {
if !matches!(e, Error::CasFailed(_, _, _) | Error::InvalidArgument(_)) {
if !matches!(
e,
Error::CasFailed(_, _, _) | Error::InvalidArgument(_) | Error::TxnConflict
) {
warn!(
"group {} issue rpc to {}: epoch {} with unknown error {e:?}",
self.group_id,
Expand Down
1 change: 1 addition & 0 deletions src/client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl RetryState {
| Error::ResourceExhausted(_)
| Error::AlreadyExists(_)
| Error::CasFailed(_, _, _)
| Error::TxnConflict
| Error::Rpc(_)
| Error::Transport(_)
| Error::Internal(_) => false,
Expand Down
24 changes: 12 additions & 12 deletions src/client/src/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,17 @@ impl Txn {

/// Get key value with in an transaction.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn get(&self, table_id: u64, key: Vec<u8>) -> AppResult<Option<Vec<u8>>> {
let value = self.get_raw_value(table_id, key).await?;
Ok(value.and_then(|v| v.content))
}

/// Get a raw key value from this transaction.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn get_raw_value(&self, table_id: u64, key: Vec<u8>) -> AppResult<Option<Value>> {
CLIENT_DATABASE_BYTES_TOTAL.rx.inc_by(key.len() as u64);
CLIENT_DATABASE_REQUEST_TOTAL.get.inc();
Expand Down Expand Up @@ -479,8 +479,8 @@ impl Txn {

/// To scan a shard.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn scan(&self, mut request: ShardScanRequest) -> AppResult<ShardScanResponse> {
let mut retry_state = RetryState::with_deadline_opt(self.deadline);
loop {
Expand Down Expand Up @@ -514,8 +514,8 @@ impl Txn {

/// Scan an range.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn range(&self, mut request: RangeRequest) -> AppResult<RangeStream> {
if request.version.is_none() {
request.version = Some(self.get_read_version().await?);
Expand All @@ -525,8 +525,8 @@ impl Txn {

/// Watch an key.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn watch(&self, table_id: u64, key: &[u8]) -> AppResult<WatchKeyStream> {
self.watch_with_version(table_id, key, 0).await
}
Expand All @@ -535,8 +535,8 @@ impl Txn {
///
/// The values below this version are ignored.
///
/// NOTE: This request will be sent to node servers, and the put/delete requests
/// already buffered in this TXN will be ignored.
/// NOTE: This request will be sent to node servers, and the put/delete
/// requests already buffered in this TXN will be ignored.
pub async fn watch_with_version(
&self,
table_id: u64,
Expand Down
10 changes: 10 additions & 0 deletions src/server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub enum Error {

#[error("abort schedule task, {0}")]
AbortScheduleTask(&'static str),

#[error("the txn is conflict with others")]
TxnConflict,
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -175,6 +178,11 @@ impl From<Error> for tonic::Status {
"epoch not match",
v1::Error::not_match(desc).encode_to_vec().into(),
),
Error::TxnConflict => Status::with_details(
Code::Unknown,
"the txn is conflict",
v1::Error::txn_conflict().encode_to_vec().into(),
),

Error::Forward(_) => panic!("Forward only used inside node"),
Error::ServiceIsBusy(_) => panic!("ServiceIsBusy only used inside node"),
Expand Down Expand Up @@ -234,6 +242,7 @@ impl From<Error> for sekas_api::server::v1::Error {
Error::CasFailed(index, cond_index, prev_value) => {
v1::Error::cas_failed(index, cond_index, prev_value)
}
Error::TxnConflict => v1::Error::txn_conflict(),

Error::Forward(_) => panic!("Forward only used inside node"),
Error::ServiceIsBusy(_) => panic!("ServiceIsBusy only used inside node"),
Expand Down Expand Up @@ -274,6 +283,7 @@ impl From<sekas_client::Error> for Error {
sekas_client::Error::CasFailed(index, cond_index, prev_value) => {
Error::CasFailed(index, cond_index, prev_value)
}
sekas_client::Error::TxnConflict => Error::TxnConflict,
sekas_client::Error::Rpc(err) => Error::Rpc(err),
sekas_client::Error::Connect(err) => Error::Rpc(err),
sekas_client::Error::Transport(err) => Error::Rpc(err),
Expand Down
7 changes: 7 additions & 0 deletions src/server/src/replica/eval/cmd_txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ pub(crate) async fn write_intent<T: LatchGuard>(
)
.await?;

if let Some(value) = prev_value.as_ref() {
if value.version > req.start_version {
trace!("txn {} are conflict with committed value {}", req.start_version, value.version);
return Err(Error::TxnConflict);
}
}

let mut wb = WriteBatch::default();
let prev_value = match write {
WriteRequest::Delete(del) => {
Expand Down
139 changes: 139 additions & 0 deletions src/server/tests/test_isolation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2024-present The Sekas Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod helper;

use std::time::Duration;

use helper::client::ClusterClient;
use helper::context::TestContext;
use helper::init::setup_panic_hook;
use helper::runtime::spawn;
use log::info;
use sekas_client::{AppError, Database, TableDesc, Txn, WriteBuilder};
use sekas_rock::fn_name;

const DB: &str = "DB";
const TABLE_A: &str = "TABLE_A";
const TABLE_B: &str = "TABLE_B";

#[ctor::ctor]
fn init() {
setup_panic_hook();
tracing_subscriber::fmt::init();
}

/// build a cluster and create a DB and two table.
async fn bootstrap_servers_and_tables(
name: &str,
) -> (TestContext, ClusterClient, Database, TableDesc, TableDesc) {
let mut ctx = TestContext::new(name);
let nodes = ctx.bootstrap_servers(3).await;
let c = ClusterClient::new(nodes).await;
let app = c.app_client().await;

let db = app.create_database(DB.to_string()).await.unwrap();
let table_a = db.create_table(TABLE_A.to_string()).await.unwrap();
let table_b = db.create_table(TABLE_B.to_string()).await.unwrap();
c.assert_table_ready(table_a.id).await;
c.assert_table_ready(table_b.id).await;

// ATTN: here is an assumption, two table would not be optimized in one txn
// batch write.

(ctx, c, db, table_a, table_b)
}

#[sekas_macro::test]
async fn test_lost_update_anomaly() {
// The constraint:
// r1[x]...w2[x]...w1[x]...c1

let (ctx, c, db, table_a, _table_b) = bootstrap_servers_and_tables(fn_name!()).await;

let table_a = table_a.id;
let loop_times = 100;

let db_clone = db.clone();
let bumper_a = spawn(async move {
for i in 0..loop_times {
loop {
let mut txn = db_clone.begin_txn();
let value = read_i64(&txn, table_a, table_a.to_string().into_bytes()).await;
let a = value & 0x0000FFFF;
let b = value & 0xFFFF0000;
if a != i {
panic!("a = {}, i = {}, b = {}, the lost update anomaly is exists", a, i, b);
}
let value = b | (a + 1);

let put = WriteBuilder::new(table_a.to_string().into_bytes())
.ensure_put(value.to_be_bytes().to_vec());
txn.put(table_a, put);
match txn.commit().await {
Ok(_) => break,
Err(AppError::TxnConflict) => {
info!("bumper a txn is conflict, retry later ...");
}
Err(err) => panic!("commit txn: {err:?}"),
}
}
sekas_runtime::time::sleep(Duration::from_millis(5)).await;
}
});

let db_clone = db.clone();
let bumper_b = spawn(async move {
for i in 0..loop_times {
loop {
let mut txn = db_clone.begin_txn();
let value = read_i64(&txn, table_a, table_a.to_string().into_bytes()).await;
let a = value & 0x0000FFFF;
let b = (value & 0xFFFF0000) >> 16;
if b != i {
panic!("b = {}, i = {}, a = {}, the lost update anomaly is exists", b, i, a);
}
let value = a | ((b + 1) << 16);

let put = WriteBuilder::new(table_a.to_string().into_bytes())
.ensure_put(value.to_be_bytes().to_vec());
txn.put(table_a, put);
match txn.commit().await {
Ok(_) => break,
Err(AppError::TxnConflict) => {
info!("bumper b txn is conflict, retry later ...");
}
Err(err) => panic!("commit txn: {err:?}"),
}
}
sekas_runtime::time::sleep(Duration::from_millis(3)).await;
}
});

bumper_a.await.unwrap();
bumper_b.await.unwrap();

let txn = db.begin_txn();
let value = read_i64(&txn, table_a, table_a.to_string().into_bytes()).await;
assert_eq!(value, (100 << 16) | 100);

drop(c);
drop(ctx);
}

async fn read_i64(txn: &Txn, table_id: u64, key: Vec<u8>) -> i64 {
match txn.get(table_id, key).await.unwrap() {
Some(bytes) => sekas_rock::num::decode_i64(&bytes).unwrap(),
None => 0,
}
}

0 comments on commit a37f0b1

Please sign in to comment.