Skip to content

Commit

Permalink
new version
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Oct 3, 2023
1 parent b36912e commit 6ee9a0e
Showing 1 changed file with 26 additions and 32 deletions.
58 changes: 26 additions & 32 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,40 +479,34 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
auto input_fut = source.OpenAsync();
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>

auto path = source.path();
auto reader_fut =
input_fut.Then([=](const std::shared_ptr<arrow::io::RandomAccessFile>&) mutable
-> Result<std::unique_ptr<parquet::ParquetFileReader>> {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
input_fut.MoveResult());
auto rfut = parquet::ParquetFileReader::OpenAsync(
std::move(input), std::move(properties), metadata);
ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult());
return reader;
});

auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
return reader_fut.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
reader_fut.MoveResult());
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
auto arrow_properties =
MakeArrowReaderProperties(*this, *metadata, *options, *parquet_scan_options);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});

return source.OpenAsync().Then([=](const std::shared_ptr<io::RandomAccessFile>& input) mutable {
return parquet::ParquetFileReader::OpenAsync(input, std::move(properties), metadata)
.Then(
[=](const std::unique_ptr<parquet::ParquetFileReader>& reader) mutable
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
auto arrow_properties = MakeArrowReaderProperties(
*self, *reader->metadata(), *options, *parquet_scan_options);

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(
options->pool,
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
// It *wouldn't* be safe to const_cast reader except that here we know
// there are no other waiters on the reader.
std::move(
const_cast<std::unique_ptr<parquet::ParquetFileReader>&>(reader)),
std::move(arrow_properties), &arrow_reader));

return std::move(arrow_reader);
},
[path = source.path()](const Status& status)
-> Result<std::shared_ptr<parquet::arrow::FileReader>> {
return WrapSourceError(status, path);
});
});
}

struct SlicingGenerator {
Expand Down

0 comments on commit 6ee9a0e

Please sign in to comment.