Skip to content

Commit

Permalink
storage: support reverse scan (tikv#3033)
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang authored May 11, 2018
1 parent 44bfad7 commit 0ac7a12
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl<T: RaftStoreRouter + 'static> tikvpb_grpc::Tikv for Service<T> {

let mut options = Options::default();
options.key_only = req.get_key_only();
options.reverse_scan = req.get_reverse();

let future = self.storage
.async_scan(
Expand Down
49 changes: 32 additions & 17 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ pub struct Options {
pub lock_ttl: u64,
pub skip_constraint_check: bool,
pub key_only: bool,
pub reverse_scan: bool,
}

impl Options {
Expand All @@ -397,8 +398,14 @@ impl Options {
lock_ttl,
skip_constraint_check,
key_only,
reverse_scan: false,
}
}

pub fn reverse_scan(mut self) -> Options {
self.reverse_scan = true;
self
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -657,23 +664,31 @@ impl Storage {
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
snap_store
.scanner(ScanMode::Forward, options.key_only, None, None)
.and_then(|mut scanner| {
let res = scanner.scan(start_key, limit);
let statistics = scanner.get_statistics();
thread_ctx.collect_scan_count(CMD, statistics);
thread_ctx.collect_read_flow(ctx.get_region_id(), statistics);
res
})
.map_err(Error::from)
.map(|results| {
thread_ctx.collect_key_reads(CMD, results.len() as u64);
results
.into_iter()
.map(|x| x.map_err(Error::from))
.collect()
})

let scan_mode = if options.reverse_scan {
ScanMode::Backward
} else {
ScanMode::Forward
};

let mut scanner = snap_store.scanner(scan_mode, options.key_only, None, None)?;
let res = if options.reverse_scan {
scanner.reverse_scan(start_key, limit)
} else {
scanner.scan(start_key, limit)
};

let statistics = scanner.get_statistics();
thread_ctx.collect_scan_count(CMD, statistics);
thread_ctx.collect_read_flow(ctx.get_region_id(), statistics);

res.map_err(Error::from).map(|results| {
thread_ctx.collect_key_reads(CMD, results.len() as u64);
results
.into_iter()
.map(|x| x.map_err(Error::from))
.collect()
})
})
.then(move |r| {
_timer.observe_duration();
Expand Down
19 changes: 19 additions & 0 deletions tests/storage/assert_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,25 @@ impl AssertionStorage {
assert_eq!(result, expect);
}

pub fn reverse_scan_ok(
&self,
start_key: &[u8],
limit: usize,
ts: u64,
expect: Vec<Option<(&[u8], &[u8])>>,
) {
let key_address = make_key(start_key);
let result = self.store
.reverse_scan(self.ctx.clone(), key_address, limit, false, ts)
.unwrap();
let result: Vec<Option<KvPair>> = result.into_iter().map(Result::ok).collect();
let expect: Vec<Option<KvPair>> = expect
.into_iter()
.map(|x| x.map(|(k, v)| (k.to_vec(), v.to_vec())))
.collect();
assert_eq!(result, expect);
}

pub fn scan_key_only_ok(
&self,
start_key: &[u8],
Expand Down
19 changes: 19 additions & 0 deletions tests/storage/sync_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ impl SyncStorage {
.wait()
}

pub fn reverse_scan(
&self,
ctx: Context,
key: Key,
limit: usize,
key_only: bool,
start_ts: u64,
) -> Result<Vec<Result<KvPair>>> {
self.store
.async_scan(
ctx,
key,
limit,
start_ts,
Options::new(0, false, key_only).reverse_scan(),
)
.wait()
}

pub fn prewrite(
&self,
ctx: Context,
Expand Down
137 changes: 137 additions & 0 deletions tests/storage_cases/test_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,143 @@ fn test_txn_store_scan() {
check_v40();
}

#[test]
fn test_txn_store_reverse_scan() {
let store = AssertionStorage::default();

// ver10: A(10) - B(_) - C(10) - D(_) - E(10)
store.put_ok(b"A", b"A10", 5, 10);
store.put_ok(b"C", b"C10", 5, 10);
store.put_ok(b"E", b"E10", 5, 10);

let check_v10 = || {
store.reverse_scan_ok(b"Z", 0, 10, vec![]);
store.reverse_scan_ok(b"Z", 1, 10, vec![Some((b"E", b"E10"))]);
store.reverse_scan_ok(
b"Z",
2,
10,
vec![Some((b"E", b"E10")), Some((b"C", b"C10"))],
);
store.reverse_scan_ok(
b"Z",
3,
10,
vec![
Some((b"E", b"E10")),
Some((b"C", b"C10")),
Some((b"A", b"A10")),
],
);
store.reverse_scan_ok(
b"Z",
4,
10,
vec![
Some((b"E", b"E10")),
Some((b"C", b"C10")),
Some((b"A", b"A10")),
],
);
store.reverse_scan_ok(
b"E\x00",
3,
10,
vec![
Some((b"E", b"E10")),
Some((b"C", b"C10")),
Some((b"A", b"A10")),
],
);
store.reverse_scan_ok(
b"E",
3,
10,
vec![Some((b"C", b"C10")), Some((b"A", b"A10"))],
);
store.reverse_scan_ok(b"", 1, 10, vec![]);
};
check_v10();

// ver20: A(10) - B(20) - C(10) - D(20) - E(10)
store.put_ok(b"B", b"B20", 15, 20);
store.put_ok(b"D", b"D20", 15, 20);

let check_v20 = || {
store.reverse_scan_ok(
b"Z",
5,
20,
vec![
Some((b"E", b"E10")),
Some((b"D", b"D20")),
Some((b"C", b"C10")),
Some((b"B", b"B20")),
Some((b"A", b"A10")),
],
);
store.reverse_scan_ok(
b"C",
5,
20,
vec![Some((b"B", b"B20")), Some((b"A", b"A10"))],
);
store.reverse_scan_ok(b"D\x00", 1, 20, vec![Some((b"D", b"D20"))]);
};
check_v10();
check_v20();

// ver30: A(_) - B(20) - C(10) - D(_) - E(10)
store.delete_ok(b"A", 25, 30);
store.delete_ok(b"D", 25, 30);

let check_v30 = || {
store.reverse_scan_ok(
b"Z",
5,
30,
vec![
Some((b"E", b"E10")),
Some((b"C", b"C10")),
Some((b"B", b"B20")),
],
);
store.reverse_scan_ok(b"E", 1, 30, vec![Some((b"C", b"C10"))]);
store.reverse_scan_ok(b"B\x00", 5, 30, vec![Some((b"B", b"B20"))]);
};
check_v10();
check_v20();
check_v30();

// ver40: A(_) - B(_) - C(40) - D(40) - E(10)
store.delete_ok(b"B", 35, 40);
store.put_ok(b"C", b"C40", 35, 40);
store.put_ok(b"D", b"D40", 35, 40);

let check_v40 = || {
store.reverse_scan_ok(
b"Z",
5,
40,
vec![
Some((b"E", b"E10")),
Some((b"D", b"D40")),
Some((b"C", b"C40")),
],
);
store.reverse_scan_ok(
b"E",
5,
100,
vec![Some((b"D", b"D40")), Some((b"C", b"C40"))],
);
};
check_v10();
check_v20();
check_v30();
check_v40();
}

#[test]
fn test_txn_store_scan_key_only() {
let store = AssertionStorage::default();
Expand Down

0 comments on commit 0ac7a12

Please sign in to comment.