Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a reverse scan API for raw client #441

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,40 @@ impl<PdC: PdClient> Client<PdC> {
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
debug!("invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
self.scan_inner(range.into(), limit, false, false).await
}

/// Create a new 'scan' request but scans in "reverse" direction.
///
/// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range.
///
/// If the number of eligible key-value pairs are greater than `limit`,
/// only the first `limit` pairs are returned, ordered by the key.
///
///
/// Reverse Scan queries continuous kv pairs in range [startKey, endKey),
/// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs.
/// The returned keys are in reversed lexicographical order.
/// If you want to include the endKey or exclude the startKey, push a '\0' to the key.
/// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_reverse(inclusive_range.into_owned(), 2);
/// let result: Vec<KvPair> = req.await.unwrap();
/// # });
/// ```
pub async fn scan_reverse(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<Vec<KvPair>> {
debug!("invoking raw reverse scan request");
self.scan_inner(range.into(), limit, false, true).await
}

/// Create a new 'scan' request that only returns the keys.
Expand All @@ -525,7 +558,40 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.scan_inner(range, limit, true, false)
.await?
.into_iter()
.map(KvPair::into_key)
.collect())
}

/// Create a new 'scan' request that only returns the keys in reverse order.
///
/// Once resolved this request will result in a `Vec` of keys that lies in the specified range.
///
/// If the number of eligible keys are greater than `limit`,
/// only the first `limit` pairs are returned, ordered by the key.
///
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let inclusive_range = "TiKV"..="TiDB";
/// let req = client.scan_keys(inclusive_range.into_owned(), 2);
/// let result: Vec<Key> = req.await.unwrap();
/// # });
/// ```
pub async fn scan_keys_reverse(
&self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<Vec<Key>> {
debug!("invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true, true)
.await?
.into_iter()
.map(KvPair::into_key)
Expand Down Expand Up @@ -682,6 +748,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
reverse: bool,
) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -703,8 +770,13 @@ impl<PdC: PdClient> Client<PdC> {
let mut cur_limit = limit;

while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
Expand Down
2 changes: 2 additions & 0 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub fn new_raw_scan_request(
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let (start_key, end_key) = range.into_keys();
Expand All @@ -92,6 +93,7 @@ pub fn new_raw_scan_request(
end_key.unwrap_or_default().into(),
limit,
key_only,
reverse,
cf,
)
}
Expand Down
13 changes: 10 additions & 3 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,20 @@ pub fn new_raw_scan_request(
end_key: Vec<u8>,
limit: u32,
key_only: bool,
reverse: bool,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawScanRequest {
let mut req = kvrpcpb::RawScanRequest::default();
req.start_key = start_key;
req.end_key = end_key;
if !reverse {
req.start_key = start_key;
req.end_key = end_key;
} else {
req.start_key = end_key;
req.end_key = start_key;
}
req.limit = limit;
req.key_only = key_only;
req.reverse = reverse;
req.maybe_set_cf(cf);

req
Expand All @@ -294,7 +301,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
type Response = kvrpcpb::RawScanResponse;
}

range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
range_request!(kvrpcpb::RawScanRequest);
shardable_range!(kvrpcpb::RawScanRequest);

impl Merge<kvrpcpb::RawScanResponse> for Collect {
Expand Down
155 changes: 155 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,62 @@ async fn raw_req() -> Result<()> {
assert_eq!(res[5].1, "v4".as_bytes());
assert_eq!(res[6].1, "v5".as_bytes());

// reverse scan

// By default end key is exclusive, so k5 is not included and start key in included
let res = client
.scan_reverse("k2".to_owned().."k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());
assert_eq!(res[2].1, "v2".as_bytes());

// by default end key in exclusive and start key is inclusive but now exclude start key
let res = client
.scan_reverse("k2\0".to_owned().."k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

// reverse scan
// by default end key is exclusive and start key is inclusive but now include end key
let res = client
.scan_reverse("k2".to_owned()..="k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());

// by default end key is exclusive and start key is inclusive but now include end key and exclude start key
let res = client
.scan_reverse("k2\0".to_owned()..="k5".to_owned(), 5)
.await?;
assert_eq!(res.len(), 3);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());

// limit results to first 2
let res = client
.scan_reverse("k2".to_owned().."k5".to_owned(), 2)
.await?;
assert_eq!(res.len(), 2);
assert_eq!(res[0].1, "v4".as_bytes());
assert_eq!(res[1].1, "v3".as_bytes());

// if endKey is not provided then it scan everything including end key
let range = BoundRange::range_from(Key::from("k2".to_owned()));
let res = client.scan_reverse(range, 20).await?;
assert_eq!(res.len(), 4);
assert_eq!(res[0].1, "v5".as_bytes());
assert_eq!(res[1].1, "v4".as_bytes());
assert_eq!(res[2].1, "v3".as_bytes());
assert_eq!(res[3].1, "v2".as_bytes());

Ok(())
}

Expand Down Expand Up @@ -704,6 +760,105 @@ async fn raw_write_million() -> Result<()> {
r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);

// test scan_reverse
// test scan, key range from [0,0,0,0] to [255.0.0.0]
let mut limit = 2000;
let mut r = client.scan_reverse(.., limit).await?;
assert_eq!(r.len(), 256);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), 156);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 195);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 196);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 251);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 3;
let mut r = client.scan_reverse(.., limit).await?;
let mut expected_start: u8 = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?;
expected_start = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
expected_start = 200 - limit as u8;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
expected_start = 200 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
expected_start = 255 - limit as u8 + 1; // including endKey
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().rev().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + expected_start);
}
r = client
.scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 0;
r = client.scan_reverse(.., limit).await?;
assert_eq!(r.len(), limit as usize);

// test batch_scan
for batch_num in 1..4 {
let _ = client
Expand Down
Loading