Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit Design of ObjectStore::put_multipart #5458

Closed
tustvold opened this issue Mar 3, 2024 · 12 comments · Fixed by #5500
Closed

Revisit Design of ObjectStore::put_multipart #5458

tustvold opened this issue Mar 3, 2024 · 12 comments · Fixed by #5500
Labels
enhancement Any new improvement worthy of a entry in the changelog object-store Object Store Interface parquet Changes to the parquet crate

Comments

@tustvold
Copy link
Contributor

tustvold commented Mar 3, 2024

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently streaming uploads are supported by ObjectStore::put_multipart. This returns a AsyncWrite, which provides a push-based interface for writing data.

However, this approach is not without issue:

Describe the solution you'd like

#4971 added a MultipartStore abstraction that more closely mirrors the APIs exposed by object stores, avoiding all of the above issues. If we could devise a way to implement this interface for LocalFileSystem we could then "promote" it into the ObjectStore trait and deprecate put_multipart. This would provide the maximum flexibility to users, whilst being in keeping with the objectives of this crate to closely hew to the APIs of the stores themselves.

The key observation that makes this possible, is that we already recommend MultiPartStore be used with fixed size chunks for compatibility with r2, we therefore could require this for LocalFilesystem, in turn allowing it to support out-of-order / parallel writes as the file offsets can be determined from the part index.

#5431 and #4857 added BufWriter and BufReader and these would be retained to preserve compatibility with the tokio ecosystem and provide a more idiomatic API on top of this

Describe alternatives you've considered

I briefly considered a put_stream API, however, this doesn't resolve many of the above issues

We could also just implement MultipartStore for LocalFilesystem, whilst retaining the current put_multipart. This would allow downstreams to opt-in to the lower level API if they so wished.

We could also modify put_multipart to return something other than AsyncWrite, possibly something closer to PutPart

Additional context

Many of the stores also support composing objects from others, this might be something to consider in this design - #4966

FYI @wjones127 @Xuanwo @alamb @roeap

@tustvold tustvold added the enhancement Any new improvement worthy of a entry in the changelog label Mar 3, 2024
@tustvold
Copy link
Contributor Author

tustvold commented Mar 3, 2024

One downside of moving to a multipart upload API is it would force unnecessary buffering in cases where the underlying store lacks a minimum part size, e.g. LocalFilesystem. More thought is needed 🤔

@alamb
Copy link
Contributor

alamb commented Mar 3, 2024

One downside of moving to a multipart upload API is it would force unnecessary buffering in cases where the underlying store lacks a minimum part size, e.g. LocalFilesystem. More thought is needed 🤔

Maybe we could do something like GetResult and basically allow a special case write for local files if a writer wanted to implement a specialized local file path 🤔

The idea of implementing MultiPartStore for LocalFileSystem makes a lot of sense to me (it could be implemented very efficiently, as you point out). Tuning write buffer sizes (aka part sizes in a multi-part upload) is likely to be object store and system specific, so the buffering doesn't seem like a fundamental problem to me.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 5, 2024

So the major challenge with providing a multipart API for LocalFilesystem is that there is no obvious way to transport the part size in use. This presents a problem for determining the offset of the final part, which will likely be smaller than the file's chunk size.

There are a few options here, but none of them particularly great:

  • Encode the part size in a separate file, adds a file read/write to every part write
  • Use a mechanism like xattr, but this would limit platform support
  • Encode the part size in the MultipartId, but this would require specifying the part size when creating the upload
  • Encode the part size in the file, but this would be fragile and hard to coordinate with parallel uploads
  • Keep multipart uploads as separate files, this would complicate listing and retrieval logic and break compatibility with non-ObjectStore based systems
  • Concatenate the the parts once upload finished, this would be simple but slow

Taking a step back, I think there are two users of multipart uploads:

  1. Users who just want to stream data to durable storage
  2. Users doing a chunked upload of an existing data set

Users in the second category are extremely unlikely to care about LocalFilesystem support, as they could just use the filesystem directly. As such I suspect they are adequately served by MultipartStore. I therefore think we can just focus our efforts on the first category of user, providing an efficient way to stream data, in-order to durable storage.

I'm therefore leaning towards replacing put_multipart with

trait ObjectStore {
    fn upload(&self, path: &Path) -> Result<Box<dyn Upload>>;
    ...
}

pub struct UploadOptions {
    /// A hint as to the size of the object to be uploaded, implementations may use this to select an appropriate IO size
    pub size_hint: Option<usize>,
    /// Implementations may perform chunked uploads in parallel, use this to restrict the concurrency
    pub max_concurrency: Option<usize>,
}

#[async_trait]
pub trait Upload {
    /// Enqueue data to be uploaded
    pub fn write(&mut self, data: &[u8]) -> Result<()> { ... }
    
    /// Enqueue `data` to be uploaded
    pub fn put(&mut self, data: Bytes) -> Result<()> { 
        self.write(&data)
    }    

    /// Flush as much data as possible to durable storage
    ///
    /// Returns the offset up to which data has been made durable 
    ///
    /// Some implementations may have IO size restrictions making this best effort
    pub async fn flush(&mut self) -> Result<usize>  { ... }

    /// Flush all written data, complete this upload, and return the [`PutResult`]
    pub async fn shutdown(&mut self) -> Result<PutResult>  { ... }

    /// Abort this upload
    pub async fn abort(&mut self) -> Result<()> { ... }
}

There are a few things worth highlighting here:

  • The synchronous write will integrate well with synchronous writers, e.g. Provide access to inner Write for parquet writers #5471
  • Implementations can choose to use the cheaper Put instead of PutMultipart if data sizes are small
  • Part sizing is abstracted away from the user
  • Implementations are not constrained on IO granularity, e.g. LocalFilesystem can stream writes directly
  • The Upload interface should be significantly easier to implement than AsyncWrite

@Xuanwo
Copy link
Member

Xuanwo commented Mar 5, 2024

I'm therefore leaning towards replacing put_multipart with

This design looks cool, but I have two concerns:

Current design it requires users to call flush when the buffer gets too large because we can't perform IO during write or put. This complicates things as users can no longer write continuously like before.

Also, I'm unclear about how max_concurrency functions. Does this mean that flush could operate asynchronously in the background?

@tustvold
Copy link
Contributor Author

tustvold commented Mar 5, 2024

Current design it requires users to call flush when the buffer gets too large because we can't perform IO during write or put. This complicates things as users can no longer write continuously like before.

In practice they have to do this anyway because of tokio-rs/tokio#4296 and #5366. In general the previous API was very difficult to actually use correctly, especially with the type of long-running synchronous operations that characterize arrow/parquet workloads.

Also, I'm unclear about how max_concurrency functions. Does this mean that flush could operate asynchronously in the background?

The idea is if Upload has accumulated enough data to do so, it could upload multiple chunks in parallel, much like WriteMultipart does currently.

Effectively aside from moving away from the problematic AsyncWrite abstraction this doesn't materially alter the way IO is performed, other than removing the async backpressure mechanism that makes AsyncWrite such a pain to integrate with predominantly sync workloads

@wjones127
Copy link
Member

The design seems reasonable overall.

In Lance, our write pattern at the moment looks like:

write col 1
...
write col N
flush
(maybe return control to caller)
write col 1
..
write col N
flush

Because I am calling flush often, I don't think I'd miss the backpressure from write. However, what I think I might miss is being able to initiate the requests during write calls. I wonder if it would make sense to have some sort of poll_flush() method? Obviously it has some of the stability concerns from #5366, but I think if given a warning it could be safe enough.

Also, is there a maximum buffer size enforced? Does reaching that make write() fail? or is it up to the user to limit how much they are buffering? (Which I think they could do easily by tracking bytes.)

I'm thinking, if implemented, how I would use this API is put the writer on a background tokio task. Then I could run the IO calls in the background and implement backpressure over some channel. This brings up the question, it is safe to call write while the future returned by flush() has begun but is incomplete? Ideally I would like to be able to enqueue more data before I've completed drained the currently queued buffer.

@tustvold
Copy link
Contributor Author

tustvold commented Mar 5, 2024

Hmm... Good point. For that sort of workload something like put_stream would probably be the best option, it does have a certain elegant simplicity/symmetry to it 🤔

Something like

struct PutStreamOptions {
    /// A hint as to the size of the object to be uploaded, implementations may use this to select an appropriate IO size
    pub size_hint: Option<usize>,
    /// Implementations may perform chunked uploads in parallel, use this to restrict the concurrency
    pub max_concurrency: Option<usize>,
}

impl ObjectStore {
    async fn put_stream(&self, stream: BoxStream<'static, Result<Bytes>>) -> Result<PutResult>;
}

@tustvold
Copy link
Contributor Author

tustvold commented Mar 6, 2024

Had a good sync with @alamb and I think we've devised a way to support the original vision of exposing a MultiPart API for stores, including LocalFilesystem. Apologies for the noise

@alamb
Copy link
Contributor

alamb commented Mar 7, 2024

Had a good sync with @alamb and I think we've devised a way to support the original vision of exposing a MultiPart API for stores, including LocalFilesystem. Apologies for the noise

To avoid leaving anyone in suspense, as I recall the basic idea is at first to require, for file backed object stores, that each multi-part upload is the same size (except for the last one). In this way, when writing multiple "parts" to a file, we can calculate apriori what offsets in the file each part will go.

If a user tries to upload a part that is not the required size, the API will error with a clear message. We can eventually maybe extend the implementation to handle different sized chunks (with copying as part of the finalize)

https://docs.rs/object_store/latest/object_store/multipart/trait.MultiPartStore.html#tymethod.put_part

I may be misremembering this

@tustvold
Copy link
Contributor Author

The design evolved a bit to accomodate reality, but it is largely inline with the spirit of the original proposal - #5500

PTAL and let me know what you think, I'm quite pleased with how it came out

tustvold added a commit that referenced this issue Mar 19, 2024
…tipartStore (#5458) (#5500)

* Replace AsyncWrite with Upload trait (#5458)

* Make BufWriter abortable

* Flesh out cloud implementations

* Review feedback

* Misc tweaks and fixes

* Format

* Replace multi-part with multipart

* More docs

* Clippy

* Rename to MultipartUpload

* Rename ChunkedUpload to WriteMultipart

* Doc tweaks

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Docs

* Format

---------

Co-authored-by: Andrew Lamb <[email protected]>
@tustvold
Copy link
Contributor Author

label_issue.py automatically added labels {'object-store'} from #5500

@tustvold tustvold added object-store Object Store Interface parquet Changes to the parquet crate labels Apr 17, 2024
@tustvold
Copy link
Contributor Author

label_issue.py automatically added labels {'parquet'} from #5485

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog object-store Object Store Interface parquet Changes to the parquet crate
Projects
None yet
4 participants