From 384c42f99ad910b13fbf3f650bfe0a6062d450ea Mon Sep 17 00:00:00 2001 From: Rahul Rane Date: Thu, 8 Feb 2024 13:59:50 -0800 Subject: [PATCH 1/3] Adding a reverse scan API for raw client Signed-off-by: Rahul Rane --- src/raw/client.rs | 47 +++++++++++++++++++++++++++++++--- src/raw/lowering.rs | 2 ++ src/raw/requests.rs | 2 ++ tests/integration_tests.rs | 52 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 109008ab..4d025a10 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -500,7 +500,40 @@ impl Client { /// ``` pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan request"); - self.scan_inner(range.into(), limit, false).await + self.scan_inner(range.into(), limit, false, false).await + } + + // Create a new 'scan' request but scans in "reverse" direction. + /// + /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range. + /// + /// If the number of eligible key-value pairs are greater than `limit`, + /// only the first `limit` pairs are returned, ordered by the key. + /// + /// + /// Reverse Scan queries continuous kv pairs in range (endKey, startKey], + /// from startKey(upperBound) to endKey(lowerBound), up to limit pairs. + /// The returned keys are in reversed lexicographical order. + /// If you want to include the endKey or exclude the startKey, push a '\0' to the key. + /// It doesn't support Scanning from "", because locating the last Region is not yet implemented. + /// # Examples + /// ```rust,no_run + /// # use tikv_client::{KvPair, Config, RawClient, IntoOwnedRange}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let inclusive_range = "TiDB"..="TiKV"; + /// let req = client.scan_reverse(inclusive_range.into_owned(), 2); + /// let result: Vec = req.await.unwrap(); + /// # }); + /// ``` + pub async fn scan_reverse( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + debug!("invoking raw reverse scan request"); + self.scan_inner(range.into(), limit, false, true).await } /// Create a new 'scan' request that only returns the keys. @@ -525,7 +558,7 @@ impl Client { pub async fn scan_keys(&self, range: impl Into, limit: u32) -> Result> { debug!("invoking raw scan_keys request"); Ok(self - .scan_inner(range, limit, true) + .scan_inner(range, limit, true, false) .await? .into_iter() .map(KvPair::into_key) @@ -682,6 +715,7 @@ impl Client { range: impl Into, limit: u32, key_only: bool, + reverse: bool, ) -> Result> { if limit > MAX_RAW_KV_SCAN_LIMIT { return Err(Error::MaxScanLimitExceeded { @@ -703,8 +737,13 @@ impl Client { let mut cur_limit = limit; while cur_limit > 0 { - let request = - new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone()); + let request = new_raw_scan_request( + cur_range.clone(), + cur_limit, + key_only, + reverse, + self.cf.clone(), + ); let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) .single_region_with_store(region_store.clone()) .await? diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 4fd35477..3065401f 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -84,6 +84,7 @@ pub fn new_raw_scan_request( range: BoundRange, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let (start_key, end_key) = range.into_keys(); @@ -92,6 +93,7 @@ pub fn new_raw_scan_request( end_key.unwrap_or_default().into(), limit, key_only, + reverse, cf, ) } diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 65165927..af59b770 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -278,6 +278,7 @@ pub fn new_raw_scan_request( end_key: Vec, limit: u32, key_only: bool, + reverse: bool, cf: Option, ) -> kvrpcpb::RawScanRequest { let mut req = kvrpcpb::RawScanRequest::default(); @@ -285,6 +286,7 @@ pub fn new_raw_scan_request( req.end_key = end_key; req.limit = limit; req.key_only = key_only; + req.reverse = reverse; req.maybe_set_cf(cf); req diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 7244ed6f..cc3d09b9 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -561,6 +561,58 @@ async fn raw_req() -> Result<()> { assert_eq!(res[5].1, "v4".as_bytes()); assert_eq!(res[6].1, "v5".as_bytes()); + // reverse scan + // By default end key is exclusive, so k5 is not included and start key in included + + let res = client + .scan_reverse("k5".to_owned().."k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + assert_eq!(res[2].1, "v2".as_bytes()); + + // by default end key in exclusive and start key is inclusive but now exclude start key + let res = client + .scan_reverse("k5".to_owned()..="k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + // reverse scan + // by default end key is exclusive and start key is inclusive but now include end key + let res = client + .scan_reverse("k5\0".to_owned().."k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); + + // by default end key is exclusive and start key is inclusive but now include end key and exclude start key + let res = client + .scan_reverse("k5\0".to_owned()..="k2".to_owned(), 5) + .await?; + assert_eq!(res.len(), 3); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + + // limit results to first 2 + let res = client + .scan_reverse("k5".to_owned().."k2".to_owned(), 2) + .await?; + assert_eq!(res.len(), 2); + assert_eq!(res[0].1, "v4".as_bytes()); + assert_eq!(res[1].1, "v3".as_bytes()); + + //let range = "k5"..; // Upperbound (k5, +inf). This is NOT SUPPORTED by TiKV. + let range = BoundRange::range_from(Key::from("k5".to_owned())); + let res = client.scan_reverse(range, 20).await?; + assert_eq!(res.len(), 0); + Ok(()) } From 6c3a93626f9e90241df26819a13efcf98be4daa1 Mon Sep 17 00:00:00 2001 From: Rahul Rane Date: Thu, 15 Feb 2024 23:19:03 -0800 Subject: [PATCH 2/3] Addressed comments Signed-off-by: Rahul Rane --- src/raw/client.rs | 8 ++++---- src/raw/requests.rs | 9 +++++++-- tests/integration_tests.rs | 22 +++++++++++++--------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 4d025a10..c119169f 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -503,7 +503,7 @@ impl Client { self.scan_inner(range.into(), limit, false, false).await } - // Create a new 'scan' request but scans in "reverse" direction. + /// Create a new 'scan' request but scans in "reverse" direction. /// /// Once resolved this request will result in a `Vec` of key-value pairs that lies in the specified range. /// @@ -511,8 +511,8 @@ impl Client { /// only the first `limit` pairs are returned, ordered by the key. /// /// - /// Reverse Scan queries continuous kv pairs in range (endKey, startKey], - /// from startKey(upperBound) to endKey(lowerBound), up to limit pairs. + /// Reverse Scan queries continuous kv pairs in range [startKey, endKey), + /// from startKey(lowerBound) to endKey(upperBound) in reverse order, up to limit pairs. /// The returned keys are in reversed lexicographical order. /// If you want to include the endKey or exclude the startKey, push a '\0' to the key. /// It doesn't support Scanning from "", because locating the last Region is not yet implemented. @@ -522,7 +522,7 @@ impl Client { /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); - /// let inclusive_range = "TiDB"..="TiKV"; + /// let inclusive_range = "TiKV"..="TiDB"; /// let req = client.scan_reverse(inclusive_range.into_owned(), 2); /// let result: Vec = req.await.unwrap(); /// # }); diff --git a/src/raw/requests.rs b/src/raw/requests.rs index af59b770..e31fc2fe 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -282,8 +282,13 @@ pub fn new_raw_scan_request( cf: Option, ) -> kvrpcpb::RawScanRequest { let mut req = kvrpcpb::RawScanRequest::default(); - req.start_key = start_key; - req.end_key = end_key; + if !reverse { + req.start_key = start_key; + req.end_key = end_key; + } else { + req.start_key = end_key; + req.end_key = start_key; + } req.limit = limit; req.key_only = key_only; req.reverse = reverse; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index cc3d09b9..a7ec0fc2 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -562,10 +562,10 @@ async fn raw_req() -> Result<()> { assert_eq!(res[6].1, "v5".as_bytes()); // reverse scan - // By default end key is exclusive, so k5 is not included and start key in included + // By default end key is exclusive, so k5 is not included and start key in included let res = client - .scan_reverse("k5".to_owned().."k2".to_owned(), 5) + .scan_reverse("k2".to_owned().."k5".to_owned(), 5) .await?; assert_eq!(res.len(), 3); assert_eq!(res[0].1, "v4".as_bytes()); @@ -574,7 +574,7 @@ async fn raw_req() -> Result<()> { // by default end key in exclusive and start key is inclusive but now exclude start key let res = client - .scan_reverse("k5".to_owned()..="k2".to_owned(), 5) + .scan_reverse("k2\0".to_owned().."k5".to_owned(), 5) .await?; assert_eq!(res.len(), 2); assert_eq!(res[0].1, "v4".as_bytes()); @@ -583,7 +583,7 @@ async fn raw_req() -> Result<()> { // reverse scan // by default end key is exclusive and start key is inclusive but now include end key let res = client - .scan_reverse("k5\0".to_owned().."k2".to_owned(), 5) + .scan_reverse("k2".to_owned()..="k5".to_owned(), 5) .await?; assert_eq!(res.len(), 4); assert_eq!(res[0].1, "v5".as_bytes()); @@ -593,7 +593,7 @@ async fn raw_req() -> Result<()> { // by default end key is exclusive and start key is inclusive but now include end key and exclude start key let res = client - .scan_reverse("k5\0".to_owned()..="k2".to_owned(), 5) + .scan_reverse("k2\0".to_owned()..="k5".to_owned(), 5) .await?; assert_eq!(res.len(), 3); assert_eq!(res[0].1, "v5".as_bytes()); @@ -602,16 +602,20 @@ async fn raw_req() -> Result<()> { // limit results to first 2 let res = client - .scan_reverse("k5".to_owned().."k2".to_owned(), 2) + .scan_reverse("k2".to_owned().."k5".to_owned(), 2) .await?; assert_eq!(res.len(), 2); assert_eq!(res[0].1, "v4".as_bytes()); assert_eq!(res[1].1, "v3".as_bytes()); - //let range = "k5"..; // Upperbound (k5, +inf). This is NOT SUPPORTED by TiKV. - let range = BoundRange::range_from(Key::from("k5".to_owned())); + // if endKey is not provided then it scan everything including end key + let range = BoundRange::range_from(Key::from("k2".to_owned())); let res = client.scan_reverse(range, 20).await?; - assert_eq!(res.len(), 0); + assert_eq!(res.len(), 4); + assert_eq!(res[0].1, "v5".as_bytes()); + assert_eq!(res[1].1, "v4".as_bytes()); + assert_eq!(res[2].1, "v3".as_bytes()); + assert_eq!(res[3].1, "v2".as_bytes()); Ok(()) } From 92367607819eb7cc532874f5e8c0892ed6201bbc Mon Sep 17 00:00:00 2001 From: Rahul Rane Date: Tue, 20 Feb 2024 13:14:01 -0800 Subject: [PATCH 3/3] Addressed comments Signed-off-by: Rahul Rane --- src/raw/client.rs | 33 +++++++++++++ src/raw/requests.rs | 2 +- tests/integration_tests.rs | 99 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index c119169f..71d40b2a 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -565,6 +565,39 @@ impl Client { .collect()) } + /// Create a new 'scan' request that only returns the keys in reverse order. + /// + /// Once resolved this request will result in a `Vec` of keys that lies in the specified range. + /// + /// If the number of eligible keys are greater than `limit`, + /// only the first `limit` pairs are returned, ordered by the key. + /// + /// + /// # Examples + /// ```rust,no_run + /// # use tikv_client::{Key, Config, RawClient, IntoOwnedRange}; + /// # use futures::prelude::*; + /// # futures::executor::block_on(async { + /// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap(); + /// let inclusive_range = "TiKV"..="TiDB"; + /// let req = client.scan_keys(inclusive_range.into_owned(), 2); + /// let result: Vec = req.await.unwrap(); + /// # }); + /// ``` + pub async fn scan_keys_reverse( + &self, + range: impl Into, + limit: u32, + ) -> Result> { + debug!("invoking raw scan_keys request"); + Ok(self + .scan_inner(range, limit, true, true) + .await? + .into_iter() + .map(KvPair::into_key) + .collect()) + } + /// Create a new 'batch scan' request. /// /// Once resolved this request will result in a set of scanners over the given keys. diff --git a/src/raw/requests.rs b/src/raw/requests.rs index e31fc2fe..8c49da9e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -301,7 +301,7 @@ impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; } -range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan. +range_request!(kvrpcpb::RawScanRequest); shardable_range!(kvrpcpb::RawScanRequest); impl Merge for Collect { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index a7ec0fc2..514c4aa8 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -760,6 +760,105 @@ async fn raw_write_million() -> Result<()> { r = client.scan(.., limit).await?; assert_eq!(r.len(), limit as usize); + // test scan_reverse + // test scan, key range from [0,0,0,0] to [255.0.0.0] + let mut limit = 2000; + let mut r = client.scan_reverse(.., limit).await?; + assert_eq!(r.len(), 256); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8); + } + r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?; + assert_eq!(r.len(), 156); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 100); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit) + .await?; + assert_eq!(r.len(), 195); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit) + .await?; + assert_eq!(r.len(), 196); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 251); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + 5); + } + r = client + .scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 0); + r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?; + assert_eq!(r.len(), 0); + + limit = 3; + let mut r = client.scan_reverse(.., limit).await?; + let mut expected_start: u8 = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client.scan_reverse(vec![100, 0, 0, 0].., limit).await?; + expected_start = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit) + .await?; + expected_start = 200 - limit as u8; + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit) + .await?; + expected_start = 200 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + expected_start = 255 - limit as u8 + 1; // including endKey + assert_eq!(r.len(), limit as usize); + for (i, val) in r.iter().rev().enumerate() { + let k: Vec = val.0.clone().into(); + assert_eq!(k[0], i as u8 + expected_start); + } + r = client + .scan_reverse(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit) + .await?; + assert_eq!(r.len(), 0); + r = client.scan_reverse(..vec![0, 0, 0, 0], limit).await?; + assert_eq!(r.len(), 0); + + limit = 0; + r = client.scan_reverse(.., limit).await?; + assert_eq!(r.len(), limit as usize); + // test batch_scan for batch_num in 1..4 { let _ = client