Skip to content

Commit

Permalink
apacheGH-43983: [C++][Parquet] Add support for arrow::ArrayStatistics…
Browse files Browse the repository at this point in the history
…: zero-copy types (apache#43984)

### Rationale for this change

Statistics is useful for fast processing.

Target types:

* `Int32`
* `Int64`
* `Float`
* `Double`
* `Timestamp[milli]`
* `Timestamp[micro]`
* `Timestamp[nano]`

### What changes are included in this PR?

Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: apache#43983

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Sep 8, 2024
1 parent 5549fa9 commit a15956f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 45 deletions.
66 changes: 52 additions & 14 deletions cpp/src/parquet/arrow/arrow_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "gtest/gtest.h"

#include "arrow/array.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/builder_time.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"

Expand Down Expand Up @@ -183,9 +185,8 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {

namespace {
::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
std::shared_ptr<::arrow::DataType> data_type, std::shared_ptr<::arrow::Array> array) {
auto schema = ::arrow::schema({::arrow::field("column", data_type)});
auto array = ::arrow::ArrayFromJSON(data_type, json);
auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), {array});
ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
const auto arrow_writer_properties =
Expand All @@ -211,21 +212,27 @@ ::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
template <typename ArrowType, typename MinMaxType>
void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
using ArrowArrayBuilder = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
using ArrowCType = typename ArrowType::c_type;
constexpr auto min = std::numeric_limits<ArrowCType>::min();
constexpr auto min = std::numeric_limits<ArrowCType>::lowest();
constexpr auto max = std::numeric_limits<ArrowCType>::max();

std::string json;
json += "[";
json += std::to_string(max);
json += ", null, ";
json += std::to_string(min);
json += ", ";
json += std::to_string(max);
json += "]";
ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
auto statistics = typed_array->statistics();
std::unique_ptr<ArrowArrayBuilder> builder;
if constexpr (::arrow::TypeTraits<ArrowType>::is_parameter_free) {
builder = std::make_unique<ArrowArrayBuilder>(::arrow::default_memory_pool());
} else {
builder =
std::make_unique<ArrowArrayBuilder>(arrow_type, ::arrow::default_memory_pool());
}
ASSERT_OK(builder->Append(max));
ASSERT_OK(builder->AppendNull());
ASSERT_OK(builder->Append(min));
ASSERT_OK(builder->Append(max));
ASSERT_OK_AND_ASSIGN(auto built_array, builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_array,
StatisticsReadArray(arrow_type, std::move(built_array)));
auto typed_read_array = std::static_pointer_cast<ArrowArrayType>(read_array);
auto statistics = typed_read_array->statistics();
ASSERT_NE(nullptr, statistics);
ASSERT_EQ(true, statistics->null_count.has_value());
ASSERT_EQ(1, statistics->null_count.value());
Expand Down Expand Up @@ -257,14 +264,30 @@ TEST(TestStatisticsRead, UInt16) {
TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
}

TEST(TestStatisticsRead, Int32) {
TestStatisticsReadArray<::arrow::Int32Type, int64_t>(::arrow::int32());
}

TEST(TestStatisticsRead, UInt32) {
TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
}

TEST(TestStatisticsRead, Int64) {
TestStatisticsReadArray<::arrow::Int64Type, int64_t>(::arrow::int64());
}

TEST(TestStatisticsRead, UInt64) {
TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
}

TEST(TestStatisticsRead, Float) {
TestStatisticsReadArray<::arrow::FloatType, double>(::arrow::float32());
}

TEST(TestStatisticsRead, Double) {
TestStatisticsReadArray<::arrow::DoubleType, double>(::arrow::float64());
}

TEST(TestStatisticsRead, Date32) {
TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
}
Expand All @@ -279,6 +302,21 @@ TEST(TestStatisticsRead, Time64) {
::arrow::time64(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, TimestampMilli) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::MILLI));
}

TEST(TestStatisticsRead, TimestampMicro) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::MICRO));
}

TEST(TestStatisticsRead, TimestampNano) {
TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
::arrow::timestamp(::arrow::TimeUnit::NANO));
}

TEST(TestStatisticsRead, Duration) {
TestStatisticsReadArray<::arrow::DurationType, int64_t>(
::arrow::duration(::arrow::TimeUnit::NANO));
Expand Down
96 changes: 65 additions & 31 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,30 +319,20 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* chunks) {
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
void AttachStatistics(::arrow::ArrayData* data,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
auto statistics = metadata->statistics().get();
if (data->null_count == ::arrow::kUnknownNullCount && !statistics) {
return;
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);

auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
array_statistics->null_count = null_count;
auto statistics = metadata->statistics().get();
if (data->null_count != ::arrow::kUnknownNullCount) {
array_statistics->null_count = data->null_count;
}
if (statistics) {
if (statistics->HasDistinctCount()) {
array_statistics->distinct_count = statistics->distinct_count();
Expand All @@ -352,31 +342,63 @@ Status TransferInt(RecordReader* reader,
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
const ArrowCType min = typed_statistics->min();
const ArrowCType max = typed_statistics->max();
if (std::is_signed<ArrowCType>::value) {
if (std::is_floating_point<ArrowCType>::value) {
array_statistics->min = static_cast<double>(min);
array_statistics->max = static_cast<double>(max);
} else if (std::is_signed<ArrowCType>::value) {
array_statistics->min = static_cast<int64_t>(min);
array_statistics->max = static_cast<int64_t>(max);
} else {
array_statistics->min = static_cast<uint64_t>(min);
array_statistics->max = static_cast<uint64_t>(max);
}
// We can assume that integer based min/max are always exact if
// they exist. Apache Parquet's "Statistics" has
// "is_min_value_exact" and "is_max_value_exact" but we can
// ignore them for integer based min/max.
// We can assume that integer and floating point number based
// min/max are always exact if they exist. Apache Parquet's
// "Statistics" has "is_min_value_exact" and
// "is_max_value_exact" but we can ignore them for integer and
// floating point number based min/max.
//
// See also the discussion at [email protected]:
// https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
array_statistics->is_min_exact = true;
array_statistics->is_max_exact = true;
}
}
array_data->statistics = std::move(array_statistics);

data->statistics = std::move(array_statistics);
}

template <typename ArrowType, typename ParquetType>
Status TransferInt(RecordReader* reader,
std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field,
Datum* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
int64_t length = reader->values_written();
ARROW_ASSIGN_OR_RAISE(auto data,
::arrow::AllocateBuffer(length * sizeof(ArrowCType), ctx->pool));

auto values = reinterpret_cast<const ParquetCType*>(reader->values());
auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
std::copy(values, values + length, out_ptr);
int64_t null_count = 0;
std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
if (field->nullable()) {
null_count = reader->null_count();
buffers[0] = reader->ReleaseIsValid();
}
auto array_data =
::arrow::ArrayData::Make(field->type(), length, std::move(buffers), null_count);
AttachStatistics<ArrowType, ParquetType>(array_data.get(), std::move(metadata), ctx);
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}

std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
const std::shared_ptr<Field>& field) {
template <typename ArrowType, typename ParquetType>
std::shared_ptr<Array> TransferZeroCopy(
RecordReader* reader, std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
const ReaderContext* ctx, const std::shared_ptr<Field>& field) {
std::shared_ptr<::arrow::ArrayData> data;
if (field->nullable()) {
std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
Expand All @@ -388,7 +410,8 @@ std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
data = std::make_shared<::arrow::ArrayData>(field->type(), reader->values_written(),
std::move(buffers), /*null_count=*/0);
}
return ::arrow::MakeArray(data);
AttachStatistics<ArrowType, ParquetType>(data.get(), std::move(metadata), ctx);
return ::arrow::MakeArray(std::move(data));
}

Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, Datum* out) {
Expand Down Expand Up @@ -794,10 +817,20 @@ Status TransferColumnData(RecordReader* reader,
break;
}
case ::arrow::Type::INT32:
result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::INT64:
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::FLOAT:
result = TransferZeroCopy<::arrow::FloatType, FloatType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::DOUBLE:
result = TransferZeroCopy(reader, value_field);
result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, &result));
Expand Down Expand Up @@ -895,7 +928,8 @@ Status TransferColumnData(RecordReader* reader,
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
result = TransferZeroCopy(reader, value_field);
result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
reader, std::move(metadata), ctx, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
Expand Down

0 comments on commit a15956f

Please sign in to comment.