Skip to content

Commit

Permalink
mvcc/reader: ignore Lock when lock's type is Lock (#3011)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored May 7, 2018
1 parent 49b4dd3 commit b605387
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
42 changes: 21 additions & 21 deletions src/storage/mvcc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::lock::Lock;
use super::lock::{Lock, LockType};
use super::write::{Write, WriteType};
use super::{Error, Result};
use kvproto::kvrpcpb::IsolationLevel;
Expand Down Expand Up @@ -194,34 +194,34 @@ impl MvccReader {
Ok(Some((commit_ts, write)))
}

fn check_lock(&mut self, key: &Key, mut ts: u64) -> Result<Option<u64>> {
fn check_lock(&mut self, key: &Key, ts: u64) -> Result<u64> {
if let Some(lock) = self.load_lock(key)? {
if lock.ts <= ts {
if ts == u64::MAX && key.raw()? == lock.primary {
// when ts==u64::MAX(which means to get latest committed version for
// primary key),and current key is the primary key, returns the latest
// commit version's value
ts = lock.ts - 1;
} else {
// There is a pending lock. Client should wait or clean it.
return Err(Error::KeyIsLocked {
key: key.raw()?,
primary: lock.primary,
ts: lock.ts,
ttl: lock.ttl,
});
}
if lock.ts > ts || lock.lock_type == LockType::Lock {
// ignore lock when lock.ts > ts or lock's type is Lock
return Ok(ts);
}

if ts == u64::MAX && key.raw()? == lock.primary {
// when ts==u64::MAX(which means to get latest committed version for
// primary key),and current key is the primary key, returns the latest
// commit version's value
return Ok(lock.ts - 1);
}
// There is a pending lock. Client should wait or clean it.
return Err(Error::KeyIsLocked {
key: key.raw()?,
primary: lock.primary,
ts: lock.ts,
ttl: lock.ttl,
});
}
Ok(Some(ts))
Ok(ts)
}

pub fn get(&mut self, key: &Key, mut ts: u64) -> Result<Option<Value>> {
// Check for locks that signal concurrent writes.
match self.isolation_level {
IsolationLevel::SI => if let Some(new_ts) = self.check_lock(key, ts)? {
ts = new_ts;
},
IsolationLevel::SI => ts = self.check_lock(key, ts)?,
IsolationLevel::RC => {}
}
loop {
Expand Down
8 changes: 8 additions & 0 deletions tests/storage_cases/test_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ fn test_txn_store_get() {
store.get_ok(b"x", 11, b"x");
}

#[test]
fn test_txn_store_get_with_type_lock() {
let store = AssertionStorage::default();
store.put_ok(b"k1", b"v1", 1, 2);
store.prewrite_ok(vec![Mutation::Lock(make_key(b"k1"))], b"k1", 5);
store.get_ok(b"k1", 20, b"v1");
}

#[test]
fn test_txn_store_delete() {
let store = AssertionStorage::default();
Expand Down

0 comments on commit b605387

Please sign in to comment.