Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt ObjectStore #761

Merged
merged 61 commits into from
Aug 30, 2022
Merged

Adopt ObjectStore #761

merged 61 commits into from
Aug 30, 2022

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Aug 22, 2022

Description

First of all I have to apologize, this is a very large PR and not all of the changes have been discussed. However trying to work around the intermediate state in migration turned out to be quite bothersome, so my hope was to "just get it done" and migrate to fully adopting ObjectStore.

As for our object stores ... The DeltaObjectStore wraps the other stores to hand some path translations etc. The local implementations for azure and gcs are completely gone. the local store uses our optimized implementation for replace_if_not_exists and the S3 store leverages the lock client much like before.

Also it seemed to me there were many different code path how we created tables or backends depending on if they were needed initialized or un-initialized. Since there are some nuances now, how the delta store needs to be configured based on the passed uri, i tried passing all of that through the builder. I also introduced a StorageUrl to help with url parsing conssitent with object_store.

ToDo:

  • update erros either here or in object store. In some places we have a ObjectStoreError::NotImplemented placeholder right now.
  • Set up integration tests for backends other than S3 - we can likely use the setup for object_store to a large extend.
  • Update documentation
  • remove patch for object_store in root Cargo.toml

Follow Ups

  • S3: migrate batch delete to object_store
  • Azure: implement batch delete in object_store
  • Contribute proxy settings in object_store
  • Isolate rusoto dependencies

Even if we decide to not merge this, I think there should be some valuable work in here to support the transition ...

Update

More and more tests are passing and are taking shape. Since I assume this is hard to review, let me try and summarize what has been done.

adopting object_store meant to remove the gcs and blob backends entirely while retaining the rename_if_not_exists of the local and s3 store. As such, these stores just proxy object_store, but for this functionality. Aside from cleanup, I reworked the error handling, removing StorageError entirely and having dedicated errors in local an s3 but only used within the mod.

All of this fragmented creating tables a bit too much, which is why I collected all creation / uri relevant code in a new builder module, and moved/updated the DeltaTableBuilder there as well. Subsequently all code path using various means of initializing tables, are migrated to using the builder. - the recommended open_* function remain of course, and were already using the builder.

Of course testing all of this must be a priority. Building on @Blajda work around a TestContext and what I learned from object_store around integration testing, I created a new module test_utils inside the crate, where we define an IntegrationContext, that can be instantiated be storage integration. Basically it handles creating and destrying resources like buckets / tempdir unsing cli tools mostly, and has some helpers to load test data into the service. This also means we are always working against a copy of the data, which avoids some of the post-test cleanup code we have floating around.

  • For testing DeltaObjectStore + backends, i ported the test suite from object_store, since we now expose the same api.
  • All S3 tests are (will be) migrated to using the IntegrationContext and applied to all integrations (haven't done GCS yet, if the cli is too foreign, I may leave that for a followup)
  • removed some now redundant tests

The rest is (should) mostly be adopting code to changed types / APIs and some more general houskeeping.

Related Issue(s)

closes #610
closes #690 - the DeltaTableBuilder allows constucting a DeltaObjectStore, which in turn exposes the "raw" object store
closes #609 - supported in object_store
closes #608 - supported in object_store
closes #568 - responsibility moved upstream
closes #546 - no longer a prerequisite
related #514 - at least partially closed since object store has a default retry policy. need to confirm for requests in this crate
closes #497 - added integration tests via azurite
closes #489 - this has been addressed before, and if not, it would be now
closes #721 - moving to officially supported formats

not #397 - while fake-gcs, can run, we need further options exposed in obnject_store to make it work.

Documentation

Copy link
Collaborator Author

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally we have all tests passing :).

The change is still quite massive, but I tried to use the opportunity to do some general housekeeping around the tests, and run tests for at leat one more integration - azure. All integration tests are now in files prefixed with integration_. Some of this I originally wanted to have in the crate, but there were conflicts with environment setups etc ...

If there is anything I can to to make reviews easier, let me know..

@@ -138,10 +139,10 @@ def write_deltalake(

if isinstance(table_or_uri, str):
table = try_get_deltatable(table_or_uri)
table_uri = table_or_uri
table_uri = str(Path(table_or_uri).absolute())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing an absolute path here was necessary to make the python 3.7 tests work, the reason that somehow encodings got messed up a bit with the leading .. This is a bit concerning, since it seems something is off with path handling, and as far as I understand a difference might be that in pyarrow 9 we do not need additional calls to the fs to get the file size.

My thinking though was that since tests are passing, and I assume we will be making a pass through the python file system implementation rather soon, that we can live with that for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I will take a look at this later as I work more on filesystems.

@@ -18,7 +18,7 @@ def s3cred() -> None:

@pytest.fixture()
def s3_localstack(monkeypatch):
monkeypatch.setenv("AWS_REGION", "us-east-2")
monkeypatch.setenv("AWS_REGION", "us-east-1")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using us-east-1 to avoid passing a region constrint in CLI for setting up tests. However this is redundant now, since we switch to s3 for s3api cli invocations.

/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions].
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.
pub mod s3_storage_options {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we still process all these options, we likely need another pass through the builder anyhow, when we add more explicit support for some delta options.

builder = builder.with_service_account_path(account);
}

// TODO (roeap) We need either the option to insecure requests, or allow http connections
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need some changes in upstream object_store to work with fake-gcs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought they already use fake-gcs upstream? https://github.com/apache/arrow-rs/blob/62eeaa5ebd59ac611b8d17f2fc26373fc30af53f/.github/workflows/object_store.yml#L67-L70

Or is it that we need to expose the option on our side?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upstream tests rely on passing in a custom client via the builder, but that option is not exposed publicly. I think all we need to du is expose that option, then we canb properly set up the backend here for integration tests.

Comment on lines +294 to +313
match ret {
Err(e) if e.0 == libc::EEXIST => Err(LocalFileSystemError::AlreadyExists {
path: to.into(),
source: Box::new(e),
}),
Err(e) if e.0 == libc::ENOENT => Err(LocalFileSystemError::NotFound {
path: to.into(),
source: Box::new(e),
}),
Err(e) if e.0 == libc::EINVAL => Err(LocalFileSystemError::InvalidArgument {
path: to.into(),
source: e,
}),
Err(e) => Err(LocalFileSystemError::Generic {
store: STORE_NAME,
source: Box::new(e),
}),
Ok(_) => Ok(()),
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error handling should be a bit flatter and more explicit now, A closer look if the mapping is correct should be good though :)

Err(UriError::InvalidScheme(String::from(parts[0])))
}
/// Deletes object by `paths`.
pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a naive batch delete function here, but we lost the optimized implementation for s3. Rather then retaining that here, I was thinking it's better to contribute this upstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we should contribute this upstream.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Error raised by storage lock client
#[derive(thiserror::Error, Debug)]
enum S3LockError {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The basic idea of using private error types internally, but exposing one global error type to consumers resonates with me. This is more or less adopted from object store. Eventually we may want to apply this throughout delta-rs, with the aim to only expose DetaTableError publicly.

@@ -0,0 +1,438 @@
#![cfg(feature = "integration_test")]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this contains more or less the entire test suite from object_store.

Comment on lines 159 to 172
async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
if let Some(ref path) = self.pause_before_copy_path {
if path == to {
pause(&self.pause_until_true);
}
}
self.copy(from, to).await?;
if let Some(ref path) = self.pause_before_delete_path {
if path == from {
pause(&self.pause_until_true);
}
}
self.delete(from).await
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should capture the logic from the previous dispatcher implementation? @houqp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it from a quick glance, cc @mosyp since he wrote all the dispatcher tests :)

Comment on lines 123 to 126
// let lock_client = dynamodb_lock::DynamoDbLockClient::new(
// rusoto_dynamodb::DynamoDbClient::new(s3_common::region()),
// dynamodb_lock::DynamoDbOptions::default(),
// );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be deleted?

@roeap
Copy link
Collaborator Author

roeap commented Aug 30, 2022

@houqp - was a little bit worried about the merge after the parquet2 PR, but as it turns, when you clean up things, things get cleaner :D. So it was very straight-forward to integrate the parquet2 changes!

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great work @roeap! I requested a few minor changes.

return Err(UriError::MissingObjectKey);
}
};
impl DeltaObjectStore {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So IIUC, the main reason we are wrapping rather than just using the ObjectStore trait as-is is for the base_path / relative URL computation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! Since all the paths from the log are relative, we always have to do that conversion, so I thought it would be easiest to just have a base path inside the object store. There are some convenience functions defined, but without the base path, these would certainly not warrant wrapping the store.

@@ -0,0 +1,687 @@
//! AWS S3 storage backend. It only supports a single writer and is not multi-writer safe.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it multi-writer safe if the lock is enabled (and all writers use the shared locking mechanism)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely it is - in fact a significant part of the development effort was spend on migrating the tests for just that fact :D

rust/src/storage/s3.rs Outdated Show resolved Hide resolved
rust/src/storage/s3.rs Outdated Show resolved Hide resolved
///
/// Options are described in [s3_storage_options].
pub fn try_new(
storage: Arc<DynObjectStore>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So usage looks like this, right?

use object_store::aws::AmazonS3Builder;
use deltalake::storage::s3::{S3StorageBackend, S3StorageOptions};

let inner = AmazonS3Builder::new().with_bucket_name("my-bucket").build()?;
let store = S3StorageBackend::try_new(Arc::new(inner), S3StorageOptions::default())?;

If so, maybe provide the above as an example.

/// get a shared reference to the delta object store
pub fn object_store(&self) -> Arc<DeltaObjectStore> {
self.storage.clone()
}

/// The
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The
/// The URI of the root of the table.

@@ -138,10 +139,10 @@ def write_deltalake(

if isinstance(table_or_uri, str):
table = try_get_deltatable(table_or_uri)
table_uri = table_or_uri
table_uri = str(Path(table_or_uri).absolute())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I will take a look at this later as I work more on filesystems.

builder = builder.with_service_account_path(account);
}

// TODO (roeap) We need either the option to insecure requests, or allow http connections
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought they already use fake-gcs upstream? https://github.com/apache/arrow-rs/blob/62eeaa5ebd59ac611b8d17f2fc26373fc30af53f/.github/workflows/object_store.yml#L67-L70

Or is it that we need to expose the option on our side?

Err(UriError::InvalidScheme(String::from(parts[0])))
}
/// Deletes object by `paths`.
pub async fn delete_batch(&self, paths: &[Path]) -> ObjectStoreResult<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roeap roeap requested review from houqp and wjones127 August 30, 2022 20:17
wjones127
wjones127 previously approved these changes Aug 30, 2022
@roeap
Copy link
Collaborator Author

roeap commented Aug 30, 2022

@wjones127 - sorry I have to ping you again, made an error transferring the doc test. It's fixed now :).

@roeap roeap merged commit 17999d2 into delta-io:main Aug 30, 2022
@roeap roeap deleted the object-store branch August 30, 2022 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants