Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37917: [Parquet] Add OpenAsync for FileSource #37918

Merged
merged 7 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
return custom_open_();
}

Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
if (filesystem_) {
return filesystem_->OpenInputFileAsync(file_info_);
}

if (buffer_) {
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(
std::make_shared<io::BufferReader>(buffer_));
}

// TODO(GH-37962): custom_open_ should not block
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, since custom_open_ might still blocking here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, wasn't sure how to deal with it. FileSource would need its own IOContext instance to run it async?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can leave a comment here, and keep it here? I think at lease, open via fileSystem could be help, and custom_open_ won't be worse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 left a todo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO is valuable but should be linked to a new issue where we extend custom_open_ to return Future<std::share_ptr<io::RandomAccessFile>>. That would be necessary to enable a custom but non blocking open

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new issue and updated the comment #37962

}

int64_t FileSource::Size() const {
if (filesystem_) {
return file_info_.size();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;

/// \brief Get the size (in bytes) of the file or buffer
/// If the file is compressed this should be the compressed (on-disk) size.
Expand Down
50 changes: 28 additions & 22 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,29 +479,35 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();

auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);

return source.OpenAsync().Then(
[=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties),
metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only
// type)> It *wouldn't* be safe to const_cast reader except that
// here we know there are no other waiters on the reader.
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
reader)),
std::move(arrow_properties), &arrow_reader));

return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}

Expand Down
Loading