Skip to content

Commit

Permalink
fix(integrations/object_store): Fix list with offset may return offset
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Oct 24, 2024
1 parent 468fca7 commit a4372df
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 47 deletions.
15 changes: 5 additions & 10 deletions integrations/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,11 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let builder = S3::from_map(
//! vec![
//! ("access_key".to_string(), "my_access_key".to_string()),
//! ("secret_key".to_string(), "my_secret_key".to_string()),
//! ("endpoint".to_string(), "my_endpoint".to_string()),
//! ("region".to_string(), "my_region".to_string()),
//! ]
//! .into_iter()
//! .collect(),
//! ).unwrap();
//! let builder = S3::default()
//! .access_key_id("my_access_key")
//! .secret_access_key("my_secret_key")
//! .endpoint("my_endpoint")
//! .region("my_region");
//!
//! // Create a new operator
//! let operator = Operator::new(builder).unwrap().finish();
Expand Down
63 changes: 26 additions & 37 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,11 @@ use tokio::sync::{Mutex, Notify};
///
/// #[tokio::main]
/// async fn main() {
/// let builder = S3::from_map(
/// vec![
/// ("access_key".to_string(), "my_access_key".to_string()),
/// ("secret_key".to_string(), "my_secret_key".to_string()),
/// ("endpoint".to_string(), "my_endpoint".to_string()),
/// ("region".to_string(), "my_region".to_string()),
/// ]
/// .into_iter()
/// .collect(),
/// ).unwrap();
/// let builder = S3::default()
/// .access_key_id("my_access_key")
/// .secret_access_key("my_secret_key")
/// .endpoint("my_endpoint")
/// .region("my_region");
///
/// // Create a new operator
/// let operator = Operator::new(builder).unwrap().finish();
Expand Down Expand Up @@ -339,33 +334,27 @@ impl ObjectStore for OpendalStore {
let offset = offset.clone();

let fut = async move {
let fut = if self.inner.info().full_capability().list_with_start_after {
self.inner
.lister_with(&path)
.start_after(offset.as_ref())
.metakey(Self::metakey())
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(try_format_object_meta)
.into_send()
.boxed()
} else {
self.inner
.lister_with(&path)
.metakey(Self::metakey())
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
.into_send()
.boxed()
};
let mut list_fut = self
.inner
.lister_with(&path)
.metakey(Self::metakey())
.recursive(true);
if self.inner.info().full_capability().list_with_start_after {
list_fut = list_fut.start_after(offset.as_ref());
}

let fut = list_fut
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
// This is required even when services support list_with_start_after.
// We use this to make sure we don't return the offset it self.
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
.into_send()
.boxed();

Ok::<_, object_store::Error>(fut)
};

Expand Down

0 comments on commit a4372df

Please sign in to comment.