-
Notifications
You must be signed in to change notification settings - Fork 406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unique delta object store url #1212
Unique delta object store url #1212
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for picking this up @gruuya!
Left some comments to clarify the desired behaviour.
rust/src/storage/mod.rs
Outdated
@@ -132,7 +128,7 @@ impl DeltaObjectStore { | |||
"delta-rs://{}", | |||
// NOTE We need to also replace colons, but its fine, since it just needs | |||
// to be a unique-ish identifier for the object store in datafusion | |||
self.prefix | |||
Path::from(self.location.path()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to go back a bit to remember why all of this was to convoluted :). Also remembered, that I wanted to revisit this and never did - sorry it caused you trouble.
So essentially the issue was that ObjectStoreUrl
is always shortened to only take the scheme and host segments of the url in case of a file system url it just uses the scheme. However right now we need datafusion to always take the dedicated store for a specific table. As such we need to generate a unique-ish host string that will not conflict with other tables. I think taking just he path segment is dangerous, since a common pattern is to put large tables at the root of a bucket or container, where the path segment will be empty. So this could lead to the same problem for two tables at the root of different buckets / containers, or with the same prefix within separate containers.
Maybe a way to go is to just run the whole location through uuid5 and use that as a host?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh no worries, this isn't a problem quite yet (though it will be soon enough, so I figured I should try and propose a solution).
I think taking just he path segment is dangerous, since a common pattern is to put large tables at the root of a bucket or container, where the path segment will be empty. So this could lead to the same problem for two tables at the root of different buckets / containers, or with the same prefix within separate containers.
That's a good point. I'd vote for not hashing the location name, mainly for ease of debugging.
Instead, a unique url can be generated by combining the non-sensitive parts (scheme, host and path) from the original location and replacing the invalid characters. I've pushed this change now and extended the tests accordingly.
rust/src/delta_datafusion.rs
Outdated
@@ -797,14 +771,11 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { | |||
fn try_encode( | |||
&self, | |||
node: Arc<dyn ExecutionPlan>, | |||
buf: &mut Vec<u8>, | |||
_buf: &mut Vec<u8>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, never looked into how datafusion does the plan serialization, but would this just be a no-op, since we are just tying to downcast the plan, but never write it to the buffer or do anything with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that could be true. I'm not sure whether something other than DeltaScan
can be passed here, in which case this wouldn't be a no-op due to the error returned.
Arguably there's no need to even have DeltaScan
now, since it's just an empty wrapper around ParquetExec
, but I left it thinking something will be needed here sooner or later (e.g. column mapping mentioned in the TODO)
let df_url = ListingTableUrl::parse(self.url.as_str())?; | ||
let storage = context | ||
.runtime_env() | ||
.object_store_registry | ||
.get_by_url(df_url); | ||
let mut table = DeltaTableBuilder::from_uri(&self.url); | ||
if let Ok(storage) = storage { | ||
// When running in ballista, the store will be deserialized and re-created | ||
// When testing with a MemoryStore, it will already be present and we should re-use it | ||
table = table.with_storage_backend( | ||
storage, | ||
Url::parse(&self.url).map_err(|err| DataFusionError::Internal(err.to_string()))?, | ||
); | ||
} | ||
let table = table.build()?; | ||
register_store(&table, context.runtime_env()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would removing this also work in ballista, where we never registered our store in the runtime env on the executor, where this plan gets deserialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should, assuming someone just instantiates a corresponding DeltaObjectStore
on the target node and registers it under the DeltaObjectStore::object_store_url
key, which is what the underlying Parquet scan will look for during execution.
I assume this was used in the similar manner before, though there's no example or test currently from what I see (I could add one?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I never worked seriously with ballista, so hard for me to guess. Maybe @avantgardnerio, who worked on the ballista integration can help us out to answer this :)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure it was necessary at the time. I don't know if the code has been changed since then.
I was waiting for the stars to align for the correct transitive dependencies to not conflict in order to make a Ballista PR, but sadly by the time they did I was refocused on other things.
Apologies about the lack of a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, thanks for chiming in!
I've now extended my previous test to also exercise DeltaScan
serde, along with the accompanying object store registration logic, which I thought would be sufficiently ergonomic for balllista or other such cases. Namely, DeltaScan
struct keeps the table uri, and the idea is that when someone deserializes a plan they should construct the relevant DeltaObjectStore
and register it with the current Datafusion state. For this purpose I've also made DeltaObjectStore::object_store_url
public globally.
Please take a look if that makes sense.
This is to ensure that the `object_store_url` returns a unique identifier for different such object stores. In addition, remove the redundant registration of the object store in the `DeltaScan`.
…ng the unique url
Instead of just using the path, include other non-sensitive location segments such as scheme and host.
318851d
to
2b326cd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work @gruuya, LGTM!
Just as an FYI, there are some changes to the object store registry etc coming up on the datafusion side, that may affect us here - but hopefully for the better :)
# Description Make the object store url be unique for stores created via `DeltaObjectStore::new`, by generating it from the location instead of the prefix (which was previously hard-coded to `/`), in the same manner as for `try_new`. Also, in the (unlikely) case that I'm not mistaken about `DeltaScan::execute` logic being redundant (see delta-io#1188 for more details), I've removed it and added a couple of tests. # Related Issue(s) Closes delta-io#1188 # Documentation
Description
Make the object store url be unique for stores created via
DeltaObjectStore::new
, by generating it from the location instead of the prefix (which was previously hard-coded to/
), in the same manner as fortry_new
.Also, in the (unlikely) case that I'm not mistaken about
DeltaScan::execute
logic being redundant (see #1188 for more details), I've removed it and added a couple of tests.Related Issue(s)
Closes #1188
Documentation