Skip to content

Commit

Permalink
ARROW-7547: [C++][Dataset][Python] Add ParquetFileFormat options
Browse files Browse the repository at this point in the history
Add parquet reader and arrow reader options to file format

Closes apache#6235 from bkietz/7547-Python-Dataset-Additional and squashes the following commits:

850125e <Benjamin Kietzman> msvc fix: don't use std::function
8b63349 <Benjamin Kietzman> msvc fix: an explicit function object for table constructor as well
1b2aeeb <Benjamin Kietzman> don't check schema metadata in file_parquet_test
cb0cefd <Benjamin Kietzman> don't require equivalent metadata in InspectDictEncoded
f46b0cc <Benjamin Kietzman> revert to enable_shared_from_this
cf30168 <Benjamin Kietzman> address review comments
61f98db <Benjamin Kietzman> only #include file_parquet.h when parquet is built
9d9a342 <Benjamin Kietzman> msvc fix, attempt 2
990833c <Benjamin Kietzman> amend doccomments for use_buffered_stream/buffer_size
df2a2f6 <Benjamin Kietzman> Revert "automatically configure use_buffered_stream for threaded scans"
27ea0d3 <Benjamin Kietzman> automatically configure use_buffered_stream for threaded scans
1412591 <Benjamin Kietzman> msvc fix
30e2c3f <Benjamin Kietzman> add doccomments for parquet format options in R
7d90954 <Benjamin Kietzman> check dictionary field in R test
2e5480a <Benjamin Kietzman> add support for batch_size to InMemorySource
1cd90e8 <Benjamin Kietzman> autopep8
d90fd19 <Benjamin Kietzman> move batch_size to ScanOptions
2b4a37e <Benjamin Kietzman> port reader_options changes to R
55bdce4 <Benjamin Kietzman> namespace reader_options, use strings(names) for dict_columns
7b6d619 <Benjamin Kietzman> add bindings to parquet file format properties in R and python
5540eb9 <Benjamin Kietzman> lint/CI fixes
7546851 <Benjamin Kietzman> update tests to test dictionary read of string columns
22caaa1 <Benjamin Kietzman> refactoring
74d41d3 <Benjamin Kietzman> ARROW-7547:  Add ParquetFileFormat options

Authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
  • Loading branch information
bkietz committed Feb 23, 2020
1 parent f03c844 commit efbc047
Show file tree
Hide file tree
Showing 36 changed files with 655 additions and 296 deletions.
3 changes: 2 additions & 1 deletion cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri, std::string* path) {
std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
std::string* path) {
return fs::FileSystemFromUri(uri, path).ValueOrDie();
}

Expand Down
60 changes: 59 additions & 1 deletion cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/make_unique.h"

Expand All @@ -32,6 +33,10 @@ namespace dataset {
Fragment::Fragment(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {}

const std::shared_ptr<Schema>& Fragment::schema() const {
return scan_options_->schema();
}

InMemoryFragment::InMemoryFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options)
Expand Down Expand Up @@ -99,9 +104,62 @@ FragmentIterator Source::GetFragments(std::shared_ptr<ScanOptions> scan_options)
return GetFragmentsImpl(std::move(simplified_scan_options));
}

struct VectorRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(std::vector<std::shared_ptr<RecordBatch>> batches)
: batches_(std::move(batches)) {}

RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); }

std::vector<std::shared_ptr<RecordBatch>> batches_;
};

InMemorySource::InMemorySource(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches)
: Source(std::move(schema)),
get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}

struct TableRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
: table_(std::move(table)) {}

RecordBatchIterator Get() const final {
auto reader = std::make_shared<TableBatchReader>(*table_);
auto table = table_;
return MakeFunctionIterator([reader, table] { return reader->Next(); });
}

std::shared_ptr<Table> table_;
};

InMemorySource::InMemorySource(std::shared_ptr<Table> table)
: Source(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}

FragmentIterator InMemorySource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
auto schema = this->schema();

auto create_fragment =
[scan_options,
schema](std::shared_ptr<RecordBatch> batch) -> Result<std::shared_ptr<Fragment>> {
if (!batch->schema()->Equals(schema)) {
return Status::TypeError("yielded batch had schema ", *batch->schema(),
" which did not match InMemorySource's: ", *schema);
}

std::vector<std::shared_ptr<RecordBatch>> batches;

auto batch_size = scan_options->batch_size;
auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);

for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}

return std::make_shared<InMemoryFragment>(std::move(batches), scan_options);
};

return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
}

FragmentIterator TreeSource::GetFragmentsImpl(std::shared_ptr<ScanOptions> options) {
Expand Down
28 changes: 23 additions & 5 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <utility>
Expand Down Expand Up @@ -47,7 +48,9 @@ class ARROW_DS_EXPORT Fragment {
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
std::shared_ptr<ScanOptions> scan_options() const { return scan_options_; }
const std::shared_ptr<ScanOptions>& scan_options() const { return scan_options_; }

const std::shared_ptr<Schema>& schema() const;

virtual ~Fragment() = default;

Expand Down Expand Up @@ -125,18 +128,33 @@ class ARROW_DS_EXPORT Source {
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A Source consisting of a flat sequence of Fragments
/// \brief A Source which yields fragments wrapping a stream of record batches.
///
/// The record batches must match the schema provided to the source at construction.
class ARROW_DS_EXPORT InMemorySource : public Source {
public:
explicit InMemorySource(std::shared_ptr<Schema> schema, FragmentVector fragments)
: Source(std::move(schema)), fragments_(std::move(fragments)) {}
class RecordBatchGenerator {
public:
virtual ~RecordBatchGenerator() = default;
virtual RecordBatchIterator Get() const = 0;
};

InMemorySource(std::shared_ptr<Schema> schema,
std::unique_ptr<RecordBatchGenerator> get_batches)
: Source(std::move(schema)), get_batches_(std::move(get_batches)) {}

// Convenience constructor taking a fixed list of batches
InMemorySource(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches);

explicit InMemorySource(std::shared_ptr<Table> table);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "in-memory"; }

private:
FragmentVector fragments_;
std::unique_ptr<RecordBatchGenerator> get_batches_;
};

/// \brief A recursive Source with child Sources.
Expand Down
29 changes: 10 additions & 19 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace dataset {

class TestInMemoryFragment : public DatasetFixtureMixin {};

using RecordBatchVector = std::vector<std::shared_ptr<RecordBatch>>;

TEST_F(TestInMemoryFragment, Scan) {
constexpr int64_t kBatchSize = 1024;
constexpr int64_t kNumberBatches = 16;
Expand All @@ -51,21 +53,18 @@ TEST_F(TestInMemoryFragment, Scan) {
class TestInMemorySource : public DatasetFixtureMixin {};

TEST_F(TestInMemorySource, GetFragments) {
constexpr int64_t kNumberFragments = 4;
constexpr int64_t kBatchSize = 1024;
constexpr int64_t kNumberBatches = 16;

SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches * kNumberFragments, batch);
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch);

std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
batch};
RecordBatchVector batches{static_cast<size_t>(kNumberBatches), batch};
auto fragment = std::make_shared<InMemoryFragment>(batches, options_);
// It is safe to copy fragment multiple time since Scan() does not consume
// the internal array.
auto source =
InMemorySource(schema_, {static_cast<size_t>(kNumberFragments), fragment});
auto source = InMemorySource(schema_, {static_cast<size_t>(kNumberBatches), batch});

AssertSourceEquals(reader.get(), &source);
}
Expand All @@ -74,25 +73,20 @@ class TestTreeSource : public DatasetFixtureMixin {};

TEST_F(TestTreeSource, GetFragments) {
constexpr int64_t kBatchSize = 1024;
constexpr int64_t kNumberBatches = 16;
constexpr int64_t kChildPerNode = 2;
constexpr int64_t kCompleteBinaryTreeDepth = 4;

SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);

auto n_leaves = 1U << kCompleteBinaryTreeDepth;
auto reader = ConstantArrayGenerator::Repeat(kNumberBatches * n_leaves, batch);

std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
batch};
auto fragment = std::make_shared<InMemoryFragment>(batches, options_);
auto reader = ConstantArrayGenerator::Repeat(n_leaves, batch);

// Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the
// leaves are InMemorySource containing kChildPerNode fragments.

auto l1_leaf_source = std::make_shared<InMemorySource>(
schema_, FragmentVector{static_cast<size_t>(kChildPerNode), fragment});
schema_, RecordBatchVector{static_cast<size_t>(kChildPerNode), batch});

auto l2_leaf_tree_source = std::make_shared<TreeSource>(
schema_, SourceVector{static_cast<size_t>(kChildPerNode), l1_leaf_source});
Expand All @@ -109,7 +103,6 @@ TEST_F(TestTreeSource, GetFragments) {
class TestDataset : public DatasetFixtureMixin {};

TEST_F(TestDataset, TrivialScan) {
constexpr int64_t kNumberFragments = 4;
constexpr int64_t kNumberBatches = 16;
constexpr int64_t kBatchSize = 1024;

Expand All @@ -118,15 +111,13 @@ TEST_F(TestDataset, TrivialScan) {

std::vector<std::shared_ptr<RecordBatch>> batches{static_cast<size_t>(kNumberBatches),
batch};
auto fragment = std::make_shared<InMemoryFragment>(batches, options_);
FragmentVector fragments{static_cast<size_t>(kNumberFragments), fragment};

SourceVector sources = {
std::make_shared<InMemorySource>(schema_, fragments),
std::make_shared<InMemorySource>(schema_, fragments),
std::make_shared<InMemorySource>(schema_, batches),
std::make_shared<InMemorySource>(schema_, batches),
};

const int64_t total_batches = sources.size() * kNumberFragments * kNumberBatches;
const int64_t total_batches = sources.size() * kNumberBatches;
auto reader = ConstantArrayGenerator::Repeat(total_batches, batch);

ASSERT_OK_AND_ASSIGN(auto dataset, Dataset::Make(sources, schema_));
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class MockSourceFactory : public SourceFactory {

Result<std::shared_ptr<Source>> Finish(const std::shared_ptr<Schema>& schema) override {
return std::make_shared<InMemorySource>(schema,
std::vector<std::shared_ptr<Fragment>>{});
std::vector<std::shared_ptr<RecordBatch>>{});
}

protected:
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
}

Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanContext> context) {
return format_->ScanFile(source_, scan_options_, context);
return format_->ScanFile(source_, scan_options_, std::move(context));
}

FileSystemSource::FileSystemSource(std::shared_ptr<Schema> schema,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ARROW_DS_EXPORT FileFormat {

/// \brief Open a fragment
virtual Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& location, std::shared_ptr<ScanOptions> options) = 0;
FileSource location, std::shared_ptr<ScanOptions> options) = 0;
};

/// \brief A Fragment that is stored in a file with a known format
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
}

Result<std::shared_ptr<Fragment>> IpcFileFormat::MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<IpcFragment>(source, options);
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<IpcFragment>(std::move(source), std::move(options));
}

} // namespace dataset
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <string>
#include <utility>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
Expand All @@ -43,13 +44,13 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) override;
FileSource source, std::shared_ptr<ScanOptions> options) override;
};

class ARROW_DS_EXPORT IpcFragment : public FileFragment {
public:
IpcFragment(const FileSource& source, std::shared_ptr<ScanOptions> options)
: FileFragment(source, std::make_shared<IpcFileFormat>(), options) {}
IpcFragment(FileSource source, std::shared_ptr<ScanOptions> options)
: FileFragment(std::move(source), std::make_shared<IpcFileFormat>(), options) {}

bool splittable() const override { return true; }
};
Expand Down
Loading

0 comments on commit efbc047

Please sign in to comment.