Skip to content

Commit

Permalink
apacheGH-37917: [Parquet] Add OpenAsync for FileSource (apache#37918)
Browse files Browse the repository at this point in the history
### Rationale for this change

Improves performance of file reads with an expensive Open operation.

### What changes are included in this PR?

### Are these changes tested?

### Are there any user-facing changes?
No

* Closes: apache#37917

Authored-by: Eero Lihavainen <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
eeroel authored Oct 4, 2023
1 parent 0abb672 commit 02de3c1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
14 changes: 14 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ Result<std::shared_ptr<io::RandomAccessFile>> FileSource::Open() const {
return custom_open_();
}

Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
if (filesystem_) {
return filesystem_->OpenInputFileAsync(file_info_);
}

if (buffer_) {
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(
std::make_shared<io::BufferReader>(buffer_));
}

// TODO(GH-37962): custom_open_ should not block
return Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
}

int64_t FileSource::Size() const {
if (filesystem_) {
return file_info_.size();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {

/// \brief Get a RandomAccessFile which views this file source
Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;

/// \brief Get the size (in bytes) of the file or buffer
/// If the file is compressed this should be the compressed (on-disk) size.
Expand Down
50 changes: 28 additions & 22 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,29 +479,35 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto reader_fut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
auto path = source.path();

auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);

return source.OpenAsync().Then(
[=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties),
metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only
// type)> It *wouldn't* be safe to const_cast reader except that
// here we know there are no other waiters on the reader.
std::move(const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(
reader)),
std::move(arrow_properties), &arrow_reader));

return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}

Expand Down

0 comments on commit 02de3c1

Please sign in to comment.