Skip to content

Commit

Permalink
Avoid whole partition cache (apache#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixYBW authored Nov 3, 2021
1 parent a2c70e8 commit 8d7e250
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 21 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ using parquet::arrow::SchemaField;
using parquet::arrow::SchemaManifest;
using parquet::arrow::StatisticsAsScalars;


/// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file.
class ParquetScanTask : public ScanTask {
public:
Expand Down
28 changes: 12 additions & 16 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <mutex>
#include <sstream>
#include <iostream>

#include "arrow/array/array_primitive.h"
#include "arrow/compute/api_scalar.h"
Expand Down Expand Up @@ -267,7 +268,7 @@ class ARROW_DS_EXPORT SyncScanner : public Scanner {
: Scanner(std::move(scan_options)), fragment_(std::move(fragment)) {}

Result<TaggedRecordBatchIterator> ScanBatches() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<RecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<ScanTaskIterator> Scan() override;
Status Scan(std::function<Status(TaggedRecordBatch)> visitor) override;
Result<std::shared_ptr<Table>> ToTable() override;
Expand Down Expand Up @@ -298,28 +299,23 @@ Result<TaggedRecordBatchIterator> SyncScanner::ScanBatches() {
});
}

Result<TaggedRecordBatchIterator> SyncScanner::ScanBatchesWithWeakFilterProject() {
Result<RecordBatchIterator> SyncScanner::ScanBatchesWithWeakFilterProject() {
ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments())
auto fn = [this](const std::shared_ptr<Fragment>& fragment) -> Result<ScanTaskIterator> {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(scan_options_));
return std::move(scan_task_it);
};

// Iterator<Iterator<ScanTask>>
auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragment_it));
auto scan_task_it = MakeFlattenIterator(std::move(maybe_scantask_it));

auto scan_fn = [](const std::shared_ptr<ScanTask>& aTask)->Result<RecordBatchIterator> {
ARROW_ASSIGN_OR_RAISE(auto recordbatch_it, aTask->Execute());
return std::move(recordbatch_it);
};

auto task_group = scan_options_->TaskGroup();
auto state = std::make_shared<ScanBatchesState>(std::move(scan_task_it), task_group);
for (int i = 0; i < scan_options_->fragment_readahead; i++) {
state->PushScanTask();
}
return MakeFunctionIterator([task_group, state]() -> Result<TaggedRecordBatch> {
ARROW_ASSIGN_OR_RAISE(auto batch, state->Pop());
if (!IsIterationEnd(batch)) return batch;
RETURN_NOT_OK(task_group->Finish());
return IterationEnd<TaggedRecordBatch>();
});
auto maybe_recordbatch_it = MakeMaybeMapIterator(scan_fn, std::move(scan_task_it));
return MakeFlattenIterator(std::move(maybe_recordbatch_it));
}

Result<FragmentIterator> SyncScanner::GetFragments() {
Expand Down Expand Up @@ -385,7 +381,7 @@ class ARROW_DS_EXPORT AsyncScanner : public Scanner,

Status Scan(std::function<Status(TaggedRecordBatch)> visitor) override;
Result<TaggedRecordBatchIterator> ScanBatches() override;
Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<RecordBatchIterator> ScanBatchesWithWeakFilterProject() override;
Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered() override;
Result<std::shared_ptr<Table>> ToTable() override;

Expand Down Expand Up @@ -508,7 +504,7 @@ Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatches() {
return MakeGeneratorIterator(std::move(batches_gen));
}

Result<TaggedRecordBatchIterator> AsyncScanner::ScanBatchesWithWeakFilterProject() {
Result<RecordBatchIterator> AsyncScanner::ScanBatchesWithWeakFilterProject() {
return Status::NotImplemented("Scanning with weak filter project not implemented in async scanner");
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ class ARROW_DS_EXPORT Scanner {
///
/// Filter and Project expressions in ScanOption will be not executed individually and
/// will be only recognized by the underlying file format.
virtual Result<TaggedRecordBatchIterator> ScanBatchesWithWeakFilterProject() = 0;
virtual Result<RecordBatchIterator> ScanBatchesWithWeakFilterProject() = 0;
/// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this
/// method may allow record batches to be returned out of order. This allows for more
/// efficient scanning: some fragments may be accessed more quickly than others (e.g.
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ReserveFromJava : public arrow::jniutil::ReservationListener {
class DisposableScannerAdaptor {
public:
DisposableScannerAdaptor(std::shared_ptr<arrow::dataset::Scanner> scanner,
arrow::dataset::TaggedRecordBatchIterator batch_itr)
arrow::RecordBatchIterator batch_itr)
: scanner_(std::move(scanner)), batch_itr_(std::move(batch_itr)) {}

static arrow::Result<std::shared_ptr<DisposableScannerAdaptor>> Create(
Expand All @@ -136,18 +136,18 @@ class DisposableScannerAdaptor {

arrow::Result<std::shared_ptr<arrow::RecordBatch>> Next() {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch, NextBatch());
return batch;
return std::move(batch);
}

const std::shared_ptr<arrow::dataset::Scanner>& GetScanner() const { return scanner_; }

private:
std::shared_ptr<arrow::dataset::Scanner> scanner_;
arrow::dataset::TaggedRecordBatchIterator batch_itr_;
arrow::RecordBatchIterator batch_itr_;

arrow::Result<std::shared_ptr<arrow::RecordBatch>> NextBatch() {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_itr_.Next())
return batch.record_batch;
return batch;
}
};

Expand Down

0 comments on commit 8d7e250

Please sign in to comment.