Skip to content

Commit

Permalink
tiny fix and test
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Sep 15, 2023
1 parent ac7e9a4 commit 0d0123b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
22 changes: 15 additions & 7 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ bool IsNan(const Scalar& value) {
}

std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata) {
const SchemaField& schema_field, const parquet::RowGroupMetaData& metadata,
const std::shared_ptr<arrow::DataType>& dest_type) {
// For the remaining of this function, failure to extract/parse statistics
// are ignored by returning nullptr. The goal is two fold. First
// avoid an optimization which breaks the computation. Second, allow the
// following columns to maybe succeed in extracting column statistics.
//
// Besides, dest_field may have different type with `schema_field`.
// `schema_field` uses deduced logical type in parquet, and `dest_type` is
// the dest arrow type.

// For now, only leaf (primitive) types are supported.
if (!schema_field.is_leaf()) {
Expand All @@ -131,7 +136,8 @@ std::optional<compute::Expression> ColumnChunkStatisticsAsExpression(
return std::nullopt;
}

return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
return ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics,
dest_type);
}

void AddColumnIndices(const SchemaField& schema_field,
Expand Down Expand Up @@ -311,7 +317,8 @@ Result<bool> IsSupportedParquetFile(const ParquetFileFormat& format,
} // namespace

std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics) {
const Field& field, const parquet::Statistics& statistics,
const std::shared_ptr<DataType>& dest_type) {
auto field_expr = compute::field_ref(field.name());

// Optimize for corner case where all values are nulls
Expand All @@ -324,8 +331,8 @@ std::optional<compute::Expression> ParquetFileFragment::EvaluateStatisticsAsExpr
return std::nullopt;
}

auto maybe_min = min->CastTo(field.type());
auto maybe_max = max->CastTo(field.type());
auto maybe_min = min->CastTo(dest_type);
auto maybe_max = max->CastTo(dest_type);

if (maybe_min.ok() && maybe_max.ok()) {
min = maybe_min.MoveValueUnsafe();
Expand Down Expand Up @@ -799,12 +806,13 @@ Result<std::vector<compute::Expression>> ParquetFileFragment::TestRowGroups(
statistics_expressions_complete_[match[0]] = true;

const SchemaField& schema_field = manifest_->schema_fields[match[0]];
auto dest_field = physical_schema_->field(match[0]);
int i = 0;
for (int row_group : *row_groups_) {
auto row_group_metadata = metadata_->RowGroup(row_group);

if (auto minmax =
ColumnChunkStatisticsAsExpression(schema_field, *row_group_metadata)) {
if (auto minmax = ColumnChunkStatisticsAsExpression(
schema_field, *row_group_metadata, dest_field->type())) {
FoldingAnd(&statistics_expressions_[i], std::move(*minmax));
ARROW_ASSIGN_OR_RAISE(statistics_expressions_[i],
statistics_expressions_[i].Bind(*physical_schema_));
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);

static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
const Field& field, const parquet::Statistics& statistics);
const Field& field, const parquet::Statistics& statistics,
const std::shared_ptr<DataType>& dest_type);

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
Expand Down
33 changes: 29 additions & 4 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ class ParquetFormatHelper {
public:
using FormatType = ParquetFileFormat;

static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
static Result<std::shared_ptr<Buffer>> Write(
RecordBatchReader* reader,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties()) {
auto pool = ::arrow::default_memory_pool();
std::shared_ptr<Buffer> out;
auto sink = CreateOutputStream(pool);
RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink));
RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink, default_writer_properties(),
arrow_properties));
return sink->Finish();
}
static std::shared_ptr<ParquetFileFormat> MakeFormat() {
Expand Down Expand Up @@ -725,10 +729,31 @@ TEST(TestParquetStatistics, NullMax) {
auto reader =
parquet::ParquetFileReader::OpenFile(dir_string + "/nan_in_stats.parquet");
auto statistics = reader->RowGroup(0)->metadata()->ColumnChunk(0)->statistics();
auto stat_expression =
ParquetFileFragment::EvaluateStatisticsAsExpression(*field, *statistics);
auto stat_expression = ParquetFileFragment::EvaluateStatisticsAsExpression(
*field, *statistics, field->type());
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

TEST(TestParquetStatistics, SchemaCast) {
auto arrow_src_field = ::arrow::field("t", duration(TimeUnit::NANO));
auto table = TableFromJSON(schema({arrow_src_field}), {
R"([{"t": 1}])",
});
TableBatchReader table_reader(*table);
ArrowWriterProperties::Builder builder;
builder.store_schema();
ASSERT_OK_AND_ASSIGN(auto buffer,
ParquetFormatHelper::Write(&table_reader, builder.build()));
std::shared_ptr<io::BufferReader> buffer_reader =
std::make_shared<io::BufferReader>(buffer);
auto reader = parquet::ParquetFileReader::Open(
buffer_reader, parquet::default_reader_properties(), nullptr);
auto statistics = reader->RowGroup(0)->metadata()->ColumnChunk(0)->statistics();
auto manifest_reduce_field = ::arrow::field("t", int64());
auto stat_expression = ParquetFileFragment::EvaluateStatisticsAsExpression(
*manifest_reduce_field, *statistics, arrow_src_field->type());
EXPECT_EQ(stat_expression->ToString(), "(t == 1)");
}

} // namespace dataset
} // namespace arrow

0 comments on commit 0d0123b

Please sign in to comment.