Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure we return a sorted listing when using a local client #344

Merged
merged 13 commits into from
Oct 18, 2024
62 changes: 59 additions & 3 deletions kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::sync::Arc;

use bytes::Bytes;
use futures::stream::StreamExt;
use itertools::Itertools;
use object_store::path::Path;
use object_store::DynObjectStore;
use object_store::{DynObjectStore, ObjectStore};
use url::Url;

use crate::engine::default::executor::TaskExecutor;
Expand All @@ -12,15 +13,22 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
#[derive(Debug)]
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
inner: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
readahead: usize,
}

impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
pub(crate) fn new(
store: Arc<DynObjectStore>,
has_ordered_listing: bool,
table_root: Path,
task_executor: Arc<E>,
) -> Self {
Self {
inner: store,
has_ordered_listing,
table_root,
task_executor,
readahead: 10,
Expand Down Expand Up @@ -72,7 +80,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
}
});

Ok(Box::new(receiver.into_iter()))
if !self.has_ordered_listing {
// This FS doesn't return things in the order we require
let mut fms: Vec<FileMeta> = receiver.into_iter().try_collect()?;
nicklan marked this conversation as resolved.
Show resolved Hide resolved
fms.sort_unstable();
Ok(Box::new(fms.into_iter().map(Ok)))
} else {
Ok(Box::new(receiver.into_iter()))
}
}

/// Read data specified by the start and end offset from the file.
Expand Down Expand Up @@ -144,6 +159,8 @@ mod tests {
use object_store::{local::LocalFileSystem, ObjectStore};

use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DefaultEngine;
use crate::Engine;

use itertools::Itertools;

Expand Down Expand Up @@ -174,6 +191,7 @@ mod tests {
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
Expand All @@ -195,4 +213,42 @@ mod tests {
assert_eq!(data[1], Bytes::from("data"));
assert_eq!(data[2], Bytes::from("el-da"));
}

#[tokio::test]
async fn test_default_engine_listing() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
let data = Bytes::from("kernel-data");

let expected_names: Vec<String> = (0..10)
.map(|i| format!("_delta_log/{:0>20}.json", i))
.collect();

// put them in in reverse order
for name in expected_names.iter().rev() {
tmp_store
.put(&Path::from(name.as_str()), data.clone().into())
.await
.unwrap();
}

let url = Url::from_directory_path(tmp.path()).unwrap();
let store = Arc::new(LocalFileSystem::new());
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let client = engine.get_file_system_client();

let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
let mut len = 0;
for (file, expected) in files.zip(expected_names.iter()) {
assert!(
file.as_ref().unwrap().location.path().ends_with(expected),
"{} does not end with {}",
file.unwrap().location.path(),
expected
);
len += 1;
}
assert_eq!(len, 10, "list_from should have returned 10 files");
}
}
22 changes: 22 additions & 0 deletions kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,31 @@ impl<E: TaskExecutor> DefaultEngine<E> {
/// - `table_root_path`: The root path of the table within storage.
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
// local filesystem doesn't return a sorted list by default. Although the `object_store`
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
// implementations actually do:
// - AWS:
// [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
// states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical
// order based on their key names." (Directory buckets are out of scope for now)
// - Azure: Docs state
// [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources):
// "A listing operation returns an XML response that contains all or part of the requested
// list. The operation returns entities in alphabetical order."
Comment on lines +78 to +81
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find! I couldn't find that documented before.

But what about non-blob azure storage? Isn't there ADLS gen 1 and ADLS gen 2 as well? Or are they supersets of the blob store API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question on adls gen 2. I think that it doesn't matter because alds gen 2 is built on blob store, so the rest apis should be the same.

// - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc
// doesn't indicate order, but [this
// page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page
// shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored
// in your Cloud Storage buckets, which are ordered in the list lexicographically by name."
// So we just need to know if we're local and then if so, we sort the returned file list in
// `filesystem.rs`
let store_str = format!("{}", store);
let is_local = store_str.starts_with("LocalFileSystem");
Self {
file_system: Arc::new(ObjectStoreFileSystemClient::new(
store.clone(),
!is_local,
table_root,
task_executor.clone(),
)),
Expand Down
14 changes: 13 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
rust_2021_compatibility
)]

use std::ops::Range;
use std::sync::Arc;
use std::{cmp::Ordering, ops::Range};

use bytes::Bytes;
use url::Url;
Expand Down Expand Up @@ -111,6 +111,18 @@ pub struct FileMeta {
pub size: usize,
}

impl Ord for FileMeta {
fn cmp(&self, other: &Self) -> Ordering {
self.location.cmp(&other.location)
}
}

impl PartialOrd for FileMeta {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Trait for implementing an Expression evaluator.
///
/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
Expand Down
26 changes: 22 additions & 4 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,15 @@ fn list_log_files_with_checkpoint(
)));
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));
debug_assert!(
commit_files
.windows(2)
.all(|cfs| cfs[0].version <= cfs[1].version),
"fs_client.list_from() didn't return a sorted listing! {:?}",
commit_files
);
// We assume listing returned ordered, we want reverse order
let commit_files = commit_files.into_iter().rev().collect();

Ok((commit_files, checkpoint_files))
}
Expand Down Expand Up @@ -443,8 +450,16 @@ fn list_log_files(
}

commit_files.retain(|f| f.version as i64 > max_checkpoint_version);
// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));

debug_assert!(
commit_files
.windows(2)
.all(|cfs| cfs[0].version <= cfs[1].version),
"fs_client.list_from() didn't return a sorted listing! {:?}",
commit_files
);
// We assume listing returned ordered, we want reverse order
let commit_files = commit_files.into_iter().rev().collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: it says .rev() works on double-ended iterators - is that what's going on here? This still has to be O(N) right? makes me wonder if we lose that much just by sorting like we used to..?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think reversing is still gonna be faster than a full sort. Or, at least not slower.


Ok((commit_files, checkpoint_files))
}
Expand Down Expand Up @@ -523,6 +538,7 @@ mod tests {
let prefix = Path::from(url.path());
let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
prefix,
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down Expand Up @@ -582,6 +598,7 @@ mod tests {

let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down Expand Up @@ -626,6 +643,7 @@ mod tests {

let client = ObjectStoreFileSystemClient::new(
store,
false, // don't have ordered listing
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
Expand Down
Loading